gpt4 book ai didi

apache-kafka - Kafka - 获得最新偏移量的最简单方法

转载 作者:行者123 更新时间:2023-12-04 12:40:51 25 4
gpt4 key购买 nike

我正在构建一个应用程序,允许动态添加和删除对 kafka 主题的订阅。添加主题订阅后,我想每小时运行一个批处理作业,以获取所有新消息并将它们推送到另一个数据存储中。

我想了解的是如何获取主题的当前偏移量。添加订阅后,我希望下一个批处理作业获取自订阅的大致时间以来的所有消息。

例如,假设我有一个名为“TopicA”的主题,它不断接收消息。如果我在晚上 7.15 点添加订阅,那么当批处理作业在晚上 8 点运行时,我希望对 7.15 点以来的所有消息进行批处理。我很高兴时间是近似的 - 7.10、7.20 等。 任何一方 5 或 10 分钟都不会让我担心。

所以我打算的解决方案是在添加订阅时获取主题的当前偏移量。我已经看过简单的消费者,但我不想参与这个基本用例的所有集群管理网络方面。

我还研究了高级消费者。我可以这样:

consumer.createMessageStreamsByFilter(new Whitelist(topicName)).head.head.offset

我对这种方法的担忧是对“head”的调用实际上是一个流。所以我相信它会阻塞等待下一条消息。阻塞是有问题的,因为它可能导致其他订阅排队等待下一条消息到达。

我很高兴花一些时间来实现后一种方法,但是如果有一种更简单的方法不需要我编写容易出错的并发代码,那么我宁愿不浪费时间。

我还需要一种方法来获取自该偏移量以来的所有日志。

最佳答案

对获取请求的每个响应都会返回一个“HighWaterMark”,它表示当前正在使用的分区的日志中的最新偏移量。
因此,理论上您可以获取给定主题的最早消息或实际上任何消息(假设存在),并从响应中提取 HighWaterMark。
这里有关于 HighWaterMark 的更多细节:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse

当然,能否从响应中提取 HighWaterMarkOffset 取决于您的客户端是否通过其自己的 Kafka API 提供该数据。

关于apache-kafka - Kafka - 获得最新偏移量的最简单方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27044384/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com