gpt4 book ai didi

docker - 为来自不同组的Kafka使用者配置相同的偏移量

转载 作者:行者123 更新时间:2023-12-02 19:25:06 25 4
gpt4 key购买 nike

我有ServiceA,它生成DomainChangeEvents并将它们提交到kafka中的主题,然后ServiceB使用kafka主题中的此事件,并将更改应用于存储在内存中的读取模型。 DomainChangeEvent的某些事件是重置事件,而某些重置域是起点。重新启动ServiceB时,我想从上次重置中读取ChangeEvents,然后重新构建域。

ServiceB在docker中作为复制服务午餐。

因为我想要ServiceB的每个副本中的所有ChangeEvent,所以我不能给它们相同的group.id,否则消息将得到负载平衡,而我不会在所有副本中都获得所有事件。重新启动后,如何配置ServiceB从最新的重置事件继续?

我尝试在ServiceB上设置随机group.id并在使用它后提交重置消息,但是重新启动后我使用了不同的group.id,因此所有消息均从头开始再次使用。

考虑过给docker副本提供不同的配置,但是在我阅读docker服务时,所有副本都配置为相同,这不是一个选择。

最佳答案

一种可能的解决方案是,通过将偏移量手动提交到例如数据库中,来存储您希望不同使用者使用的那些点。

如下表:

Topic  Partition  Offset

topicA 0 112
topicA 1 125
topicB 0 2313
topicB 1 2984
topicB 2 2554

这些将是您的“最新重置”点,或您的消费者希望从中开始的职位。正确地说, subscribe()方法的问题在于它取决于 group.id 参数,并且在进行消费者重新平衡和协调游戏。

为了从固定点(或不同分区中的一组点)进行消耗,您应该改为调用 assign()。使用这种方法,您将能够 手动为消费者指定分区列表。没有group.id,没有动态分区分配,也没有偏移量加载,这似乎是您需要的。

分配分区后,应调用 seek()。使用seek,您可以告诉使用者要从 assign()方法上指定的分区开始读取哪个偏移量。

例如,要开始阅读任何主题的“最新重置”,您应该执行以下操作:
//seeking the last offset of topicA's partition0
public void setStartPosition(TopicPartition partition, long offset)
{
consumer.assign(Collections.singletonList(partition)); //f.e-> partition0
consumer.seek(partition, offset); //f.e -> 112
}

调用此方法将使您的使用者完全位于每个分区中的所需位置。我不确定是否要回答您的问题,但希望对您有所帮助!

关于docker - 为来自不同组的Kafka使用者配置相同的偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56787108/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com