gpt4 book ai didi

apache-kafka - 合并kafka流中的记录

转载 作者:行者123 更新时间:2023-12-02 18:39:24 28 4
gpt4 key购买 nike

是否可以合并kafka中的记录并将输出发布到不同的流?

例如,有一个针对 kafka 主题的事件流,如下所示

{txnId:1,startTime:0900},{txnId:1,endTime:0905},{txnId:2,endTime:0912},{txnId:3,endTime:0930},{txnId:2 ,startTime:0912},{txnId:3,startTime:0925}……

我想通过 txnId 合并这些事件并创建如下合并输出

{txnId:1,startTime:0900,endTime:0905},{txnId:2,startTime:0910,endTime:0912},{txnId:3,startTime:0925,endTime:0930}

请注意,传入事件中的顺序不会保持。因此,如果在开始时间事件之前收到 txn Id 的 endTime,那么我们需要等到收到该 txnId 的开始时间事件后再启动合并

我浏览了 Kafka Streams 示例附带的字数示例,但不清楚如何等待事件,然后在进行转换时合并。

任何想法都将受到高度赞赏。

最佳答案

您可以尝试通过将开始和结束事件拆分为 2 个单独的流(以 txnId 作为键),然后加入这两个流来解决此问题。

KStream<String, String> eventSource = new StreamBuilder().stream("INPUT-TOPIC");

KStream<String, JsonNode>[] splitEvents =
eventSource.map((key, eventString) -> {
JsonNode event = new ObjectMapper().readTree(eventString);
String txnId = event.path("txnId").asText();
return KeyValue.pair(txnId, event);
})
.branch((key, event) -> event.findValue("startTime") != null,
(key, event) -> event.findValue("endTime") != null);


KStream<String, JsonNode> startEvents = splitEvents[0];
KStream<String, JsonNode> endEvents = splitEvents[1];

当连接的任一侧都有事件时,如图所示的 2 个流之间的连接将产生连接结果。因此,两个事件的顺序并不重要(您必须确保为连接设置适当的窗口期)。

Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());

KStream<String, String> completeEvents = startEvents.join(endEvents,
(startEvent, endEvent) -> {
// Add logic to merge startEvent and endEvent as seen fit
ObjectNode completeEvent = JsonNodeFactory.instance.objectNode();
completeEvent.put("startTime", startEvent.path("startTime).asText());
completeEvent.put("endTime", endEvent.path("endTime").asText());
return completeEvent.toString();
},
JoinWindows.of(Duration.ofMinutes(15)),
Joined.with(
Serdes.String(), // key
jsonSerde, // left object
jsonSerde // right object
)
);

关于apache-kafka - 合并kafka流中的记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68247147/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com