gpt4 book ai didi

tcp - 在 logstash 中以事务方式发送事件

转载 作者:可可西里 更新时间:2023-11-01 02:53:12 26 4
gpt4 key购买 nike

我正在尝试使用 logstash 从 TCP 套接字接收事件,并将它们输出到 Kafka 主题。我当前的配置能够完美地做到这一点,但我希望能够以事务方式向 Kafka 发送事件。我的意思是,系统不应该将事件发送到 kafka,直到收到提交消息:

START TXN 123         --No message sent to Kafka
123 - Event1 Message --No message sent to Kafka
123 - Event2 Message --No message sent to Kafka
123 - Event3 Message --No message sent to Kafka
COMMIT TXN 123 --Event1, Event2, Event3 messages sent to Kafka

是否有可能仅使用 logstash 来实现此目的,或者我是否应该在源和 logstash 之间引入任何其他事务协调器?这是我当前的配置:

input {
tcp {
port => 9000
}
}

output {
kafka {
bootstrap_servers => "localhost:9092"
topic_id => "alpayk"
}
}

我尝试使用 logstash 的聚合过滤器来达到这个目的,但我无法以有效的方式结束。

非常感谢您

最佳答案

为此,我最终决定使用 Apache Flume。我修改了它的 netcat 源,以便未提交的消息驻留在 flume 的堆中,一旦收到事务的提交消息,所有消息都会发送到 kafka sink。

我打算将消息存储位置从 flume heap 更改为外部缓存,这样我就可以在事务异常终止或回滚时使存储的消息过期。

下面是我的一段交易逻辑代码:

String eventMessage = new String(body);
int indexOfTrxIdSeparator = eventMessage.indexOf("-");
if (indexOfTrxIdSeparator != -1) {
String txnId = eventMessage.substring(0, indexOfTrxIdSeparator).trim();
String message = eventMessage.substring(indexOfTrxIdSeparator + 1).trim();
ArrayList<Event> events = cachedEvents.get(txnId);

if (message.equals("COMMIT")) {

System.out.println("@@@@@ COMMIT RECEIVED");

if (events != null) {
for (Event eventItem : events) {
ChannelException ex = null;
try {
source.getChannelProcessor().processEvent(eventItem);
} catch (ChannelException chEx) {
ex = chEx;
}

if (ex == null) {
counterGroup.incrementAndGet("events.processed");
} else {
counterGroup.incrementAndGet("events.failed");
logger.warn("Error processing event. Exception follows.", ex);
}
}

cachedEvents.remove(txnId);
}
} else {
System.out.println("@@@@@ MESSAGE RECEIVED: " + message);
if (events == null) {
events = new ArrayList<Event>();
}
events.add(EventBuilder.withBody(message.getBytes()));
cachedEvents.put(txnId, events);
}
}

我将此代码添加到 Flume 的 netcat 源的 processEvents 方法中。我不想使用 Ruby 代码,这就是我决定切换到 Flume 的原因。然而,同样的事情也可以在 logstash 中完成。

谢谢

关于tcp - 在 logstash 中以事务方式发送事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53958341/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com