gpt4 book ai didi

java - 当我使用 akka 流在现有消费者组中创建新消费者时,如何寻求 kafka 主题的结束?

转载 作者:行者123 更新时间:2023-11-30 05:35:27 25 4
gpt4 key购买 nike

我有一个 akka 应用程序(JAVA 语言),它使用 commitablePartitionedSource 来消费来自 kafka 主题的消息。我有一些消费者群体,可以激发消费者关注多个主题。这是由动态配置驱动的,我可以暂时关闭消费者,也许稍后再重新启动它们。

当这个消费者重新启动时,我只想阅读新消息,而不是从我上次停下的地方开始。

有没有办法从akka-alpakka消费者获取kafkaConsumer对象,以便我可以在处理之前seekToEnd()?请告诉我是否还有其他方法可以实现这一目标?也许使用 akka 配置或不同类型的消费者?我不想维护自己的偏移量(希望不是唯一的选择)

我的配置设置为在启动消费者组时获取最新偏移量,但由于我要关闭并重新启动单个消费者,它总是从我停止的地方开始消费。

我尝试为某个主题创建一个消费者组,但我有很多主题,而且结果相当耗费资源。我还寻找了一种方法来清除存储在 kafka 中的该主题的偏移量,但没有成功。

最佳答案

最简单的方法就是在每次开始重新启动消费者时创建一个新的消费者组。 Kafka 将在一段可配置的时间(retention.ms)后删除过时的消费者组

如果您很少重新启动消费者并始终希望它处理新数据而不是追赶所有丢失的消息,则此策略很好。

编辑

据我所知,访问底层 KafkaConsumer 的唯一方法是使用 committableExternalSource。这样您就可以访问 seekToEnd 方法,但是您还需要订阅提供每个分区起始偏移量的主题(类似于您现在设置committablePartitionedSource 但在 Akka 之外)。

关于java - 当我使用 akka 流在现有消费者组中创建新消费者时,如何寻求 kafka 主题的结束?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56738191/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com