gpt4 book ai didi

apache-kafka - 如何找出 Kafka 主题的最新偏移量以了解我的读者何时了解主题的最新信息?

转载 作者:行者123 更新时间:2023-12-04 00:11:44 27 4
gpt4 key购买 nike

我有一台服务器需要保留所有用户的内存缓存。因此,假设列表不会很大 - 几十万个项目,我想使用带有键控消息的 Kafka 主题,其中键是 userId 以保持该列表的当前状态,管理应用程序将发送新的用户对象当事情发生变化时转到那个话题。因此,当服务器启动时,它只需要从头开始读取该主题的所有内容并填充它的缓存。

填充阶段大约需要 20-30 秒,具体取决于与 Kafka 的连接,因此服务器不需要在线,直到它从主题中读取所有内容以获得最新的缓存(主题中的所有消息在开始的时刻被认为是最新的)。但是我不知道如何确定我是否从 Kafka 流中读取了所有内容以通知其他服务缓存已填充并且服务器可以启动服务器请求。我读过有关高水印的信息,但没有看到它在 Java 消费者 API 中公开。

那么如何找到 Kafka 主题的最新偏移量以了解我的阅读器何时是最新的?

最佳答案

假设您使用的是高级消费者。

高水印在高级消费者中不可用。

**As you mentioned: all the messages in the topic at the moment of start is considered up-to-date**

当您的应用程序启动时,您可以使用 SimpleConsumer Api 执行以下操作:-

  1. 通过向 kafka 集群中的任何代理发出 TopicMetadataRequest 来查找主题中的分区数。

  2. 创建分区到 latestOffset 映射,其中键是分区,值是该分区中可用的 latestOffset。

    Map offsetMap = new HashMap<>()

  3. 对于主题中的每个分区p:

    一个。找到分区 p 的领导者

    B.向领导者发送 OffsetRequest

    C.从 OffsetResponse 获取最新的 Offset

    D.向 offsetMap 添加一个条目,其中键是分区 p,偏移量是最新偏移量。

  4. 开始使用高级消费者从kafka读取消息:

    一个。对于从 KafkaStream 收到的每条消息:

      AA. Get the partition && offset of the message
    BB. if( offsetMap.get(partition)<=offset) stop Reading from this steam

希望这对您有所帮助。

关于apache-kafka - 如何找出 Kafka 主题的最新偏移量以了解我的读者何时了解主题的最新信息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33770106/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com