gpt4 book ai didi

java - 如何暂停 kafka 消费者?

转载 作者:行者123 更新时间:2023-12-04 03:09:27 24 4
gpt4 key购买 nike

我在我的框架中使用了 Kafka 生产者 - 消费者模型。在消费者端消费的记录稍后会被索引到 elasticsearch 上。这里我有一个用例,如果 ES 关闭,我将不得不暂停 kafka 消费者直到 ES 启动,一旦它启动,我需要恢复消费者并从我上次离开的地方消费记录。
我认为@KafkaListener 无法实现这一点。任何人都可以请给我一个解决方案吗?我发现我需要为此编写自己的 KafkaListenerContainer,但我无法正确实现它。任何帮助将非常感激。

最佳答案

有几种可能的解决方案,一种简单的方法是使用 KafkaConsumer API。在 KafkaConsumer 实现中,跟踪将在下一次调用 poll(...) 时检索的主题上的位置。你的问题是你从Kafka获取记录后,可能无法插入到Elastic Search中。在这种情况下,您必须编写一个例程来重置消费者的位置,在您的情况下将是 consumer.seek(partition, consumer.position(partition)-1)。这会将位置重置为较早的位置。此时,一个好的方法是暂停分区(这将使服务器能够进行一些资源清理),然后轮询 ES(通过您想要的任何机制)。一旦 ES 可用,在消费者上调用 resume 并继续您通常的轮询-插入循环。

讨论后编辑

使用指定的生命周期方法创建一个 spring bean。在 bean 的初始化方法中实例化您的 KafkaConsumer(从任何来源检索消费者的配置)。从方法启动一个线程与消费者交互并更新ES,其余设计同上。这是一个单线程模型。为了获得更高的吞吐量,请考虑将从 Kafka 检索到的数据保留在内存队列中的小内存队列中,并使用一个调度程序线程来获取消息并将其提供给池线程以更新 ES。

关于java - 如何暂停 kafka 消费者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46447119/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com