gpt4 book ai didi

apache-kafka - 确保使用 REST 代理从 Kafka 主题读取所有消息

转载 作者:行者123 更新时间:2023-12-04 12:44:40 25 4
gpt4 key购买 nike

我是 Kafka 的新手,我们的团队正在研究服务间通信的模式。

目标

我们有两个服务,P(生产者)和 C(消费者)。 P 是 C 需要的一组数据的真实来源。当 C 启动时,它需要将 P 中的所有当前数据加载到其缓存中,然后订阅更改通知。 (换句话说,我们希望在服务之间同步数据。)

数据总量比较少,变化不频繁。同步的短暂延迟是可以接受的(最终一致性)。

我们希望将服务解耦,以便 P 和 C 不需要相互了解。

提案

当 P 启动时,它会将其所有数据发布到启用了日志压缩的 Kafka 主题。每条消息都是一个 aggregate带有其 ID 的 key 。

当 C 启动时,它从主题的开头读取所有消息并填充其缓存。然后它继续从其偏移量中读取以获取更新通知。

当 P 更新其数据时,它会为更改的聚合发布消息。 (此消息与原始消息具有相同的架构。)

当 C 收到一条新消息时,它会更新其缓存中的相应数据。

enter image description here

约束

我们正在使用 Confluent REST Proxy与卡夫卡沟通。

问题

当 C 启动时,它如何知道它何时读取了来自主题的所有消息,以便它可以安全地开始处理?

如果 C 没有立即注意到 P 一秒前发送的消息,这是可以接受的。如果 C 在消费 P 一个小时前发送的消息之前开始处理,这是 Not Acceptable 。请注意,我们不知道何时会更新 P 的数据。

我们不希望 C 在消费每条消息后必须等待 REST 代理的轮询间隔。

最佳答案

如果你想找到一个消费者组的结束分区,为了知道你什么时候得到了一个时间点的所有数据,你可以使用

POST /consumers/(string: group_name)/instances/(string: instance)/positions/end

请注意,您必须在该搜索之前进行一次投票 ( GET /consumers/.../records ),但您不需要提交。

如果您不想影响现有消费者组的偏移量,则必须单独发布一个。

然后您可以查询偏移量
GET /consumers/(string: group_name)/instances/(string: instance)/offsets

请注意,在计算结束偏移量和实际到达结束之间可能会有数据写入主题,因此您可能希望进行一些额外的设置,以便在最终到达结束时进行更多的消耗。

关于apache-kafka - 确保使用 REST 代理从 Kafka 主题读取所有消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57222357/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com