gpt4 book ai didi

apache-kafka - Kafka Streams拓扑的处理顺序是否指定?

转载 作者:行者123 更新时间:2023-12-04 10:02:44 25 4
gpt4 key购买 nike

我想知道是否指定了流拓扑处理消息的顺序。

示例:

        // read input messages

KStream<String, String> inputMessages = builder.stream("demo_input_topic_1");
inputMessages = inputMessages.peek((k, v) -> System.out.println("TECHN. NEW MESSAGE: key: " + k + ", value: " + v));

// check if message was already processed

KTable<String, Long> alreadyProcessedMessages = inputMessages.groupByKey().count();
KStream<String, String> newMessages =
inputMessages.leftJoin(alreadyProcessedMessages, (streamValue, tableValue) -> getMessageValueOrNullIfKnownMessage(streamValue, tableValue));
KStream<String, String> filteredNewMessages =
newMessages.filter((key, val) -> val != null).peek((k, v) -> System.out.println("FUNC. NEW MESSAGE: key: " + k + ", value: " + v));

// process the message

filteredNewMessages.map((key, value) -> KeyValue.pair(key, "processed message: " + value))
.peek((k, v) -> System.out.println("PROCESSED MESSAGE: key: " + k + ", value: " + v)).to("demo_output_topic_1");

使用getMessageValueOrNullIfKnownMessage(...):

    private static String getMessageValueOrNullIfKnownMessage(String newMessageValue, Long messageCounter) {
if (messageCounter > 1) {
return null;
}

return newMessageValue;
}

因此示例中只有一个输入和一个输出主题。

输入主题在 alreadyProcessedMessages 中进行计数(从而创建本地状态)。此外,输入主题与计数表 alreadyProcessedMessages 连接,连接结果是流 newMessages(该流中的消息值为 null 如果消息计数 > 1,否则为消息的原始值)。

然后,newMessages 的消息被过滤(null 值被过滤掉),并将结果写入输出主题。

这个最小流的作用是:它将所有消息从输入主题写入具有新 key (之前未处理过的 key )的输出主题。

在测试中,流可以工作。但我认为这并不能保证。它之所以有效,是因为消息在加入之前首先由计数节点处理。

但是该订单有保证吗?

据我在所有文档中看到的,无法保证此处理顺序。因此,如果有新消息到达,也可能会发生这种情况:

  • 消息由“加入节点”处理。
  • 消息由“计数节点”处理。

这当然会产生不同的结果(因此在这种情况下,如果具有相同 key 的消息第二次到达,它仍然会与原始值连接,因为它尚未被计数)。

那么处理顺序是在某处指定的吗?

我知道在新版本的 Kafka 中,KStream-KTable 连接是根据输入分区中消息的时间戳完成的。但这在这里没有帮助,因为拓扑使用相同的输入分区(因为它是相同的消息)。

谢谢

最佳答案

没有任何保证。即使在当前实现中,使用子节点的 List: https://github.com/apache/kafka/blob/3.6/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L267-L269 -- 但是,不能保证子节点按照 DSL 中指定的顺序附加到此列表(因为中间有一个转换层,可能会以不同的顺序添加节点)。此外,实现可能随时发生变化。

我能想到的唯一解决方法(相当昂贵)可能是在重新分区主题中发送流端数据:

KStream<String, String> newMessages =
inputMessages.through(...) // note: as of 2.6.0 release, you could use `repartition()` instead of `through()`
.leftJoin(alreadyProcessedMessages, ...);

这样,KTable 将在执行连接之前更新,因为需要先读回记录。但是,由于您无法保证何时读回记录,因此在完成联接之前可能会对表进行多次更新,这可能会让您处于与以前类似的情况。 (此外,通过其他主题重新路由数据的成本有些昂贵。)

使用处理器 API,您将拥有移动控制权,因为您可以调用 context.forward(..., To.child(...))。但是,对于这种情况,您还需要手动实现聚合和连接:

KStream routing = inputMessages.transform(...);
routing.groupByKey(...);
routing.leftJoin(...);

对于这种情况,您会在 transform() 之后获得您想要避免的重新分区主题:

KStream routing = inputMessages.transform(...);
routing.transform(...); // implement the aggregation
routing.transform(...); // implement the join

连续的 transform()不会触发自动重新分区。

关于apache-kafka - Kafka Streams拓扑的处理顺序是否指定?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61748676/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com