gpt4 book ai didi

apache-kafka - 如何读取Kafka主题中的所有记录

转载 作者:行者123 更新时间:2023-12-04 15:49:01 26 4
gpt4 key购买 nike

我正在使用 kafka:kafka_2.12-2.1.0,客户端的 spring kafka 并且遇到了问题。

我需要通过读取 kafka 主题中的所有现有消息来加载内存映射。我通过启动一个新的消费者(具有唯一的消费者组 ID 并将偏移量设置为 earliest )来做到这一点。然后我遍历消费者(轮询方法)以获取所有消息并在消费者记录变空时停止。

但我注意到,当我开始轮询时,前几次迭代将消费者记录返回为空,然后开始返回实际记录。现在这打破了我的逻辑,因为我们的代码认为主题中没有记录。

我尝试了其他一些方法(例如使用偏移量),但除了在某处保留另一条记录告诉我该主题中有多少消息需要在我停止之前阅读之外,还没有提出任何解决方案.

有什么想法吗?

最佳答案

据我了解,您想要实现的是根据特定主题中已有的值在您的应用程序中构建 map 。

对于此任务,您可以使用 Ktable 而不是手动轮询主题。在 Kafka Streams DSL 中,它将自动构建一个可读的键值存储,该存储具有容错性,启用复制并自动填充新值。

您可以通过在流上调用 groupByKey 然后使用聚合来简单地做到这一点。

KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> myKStream = builder.stream(Serdes.String(), Serdes.Long(), "topic_name");
KTable<String, Long> totalCount = myKStream.groupByKey().aggregate(this::initializer, this::aggregator);

(实际代码可能会因 kafka 版本、您的配置等而异。)

阅读有关 Kafka Stream 概念的更多信息 here

Then I iterate over the consumer (poll method) to get all messages and stop when the consumer records become empty



Kafka 是一个消息流平台。您流式传输的任何数据都在不断更新,您可能不应该以期望消耗在一定数量的消息后停止的方式使用它。停止消费者后,如果有新消息进来,您将如何处理?

此外,您获得空记录的原因可能与记录在不同分区等有关。

您在这里的具体用例是什么?,使用 Kafka 语义本身可能有一个很好的方法来做到这一点。

关于apache-kafka - 如何读取Kafka主题中的所有记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54623123/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com