gpt4 book ai didi

java - 在java中限制kafka消费者消息的正确方法

转载 作者:行者123 更新时间:2023-12-05 05:48:02 25 4
gpt4 key购买 nike

我遇到这样一种情况,其中 1 个线程正在快速消费来自 Kafka 主题的消息,并将它们放入阻塞队列,然后在另一个线程中消费,将批量插入写入 mongo 数据库集合。我没有看到很多答案,因为这是一个常见问题,我的应用程序崩溃是因为消息 q 变得太大并且内存不足,因为 mongo db writer 线程跟不上消息消耗率。

配置kafka消费者暂停消息消费一段时间直到消息q恢复到合理大小的正确方法是什么。我可以在泳池循环中暂停一下吗?我不这么认为,否则消费者将被标记为不在线,我可以在每次消息 q 变得太大时关闭 Kafka 消费者,然后在它回到可管理的大小时重新连接吗?我可以,但这似乎不是一个干净的解决方案,我正在寻找的是说“嘿卡夫卡,请暂停向我的活跃消费者发送消息,直到我告诉你恢复”,因为这可以让我以最快的速度提取消息我可以将它们插入到我的数据存储中。

请帮忙!

最佳答案

kafka api中有pause和resume方法 https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#pause(java.util.Collection)

如果您检查 Consumption Flow Control 部分,它说明如下:

Kafka supports dynamic controlling of consumption flows by using pause(Collection) and resume(Collection) to pause the consumption on the specified assigned partitions and resume the consumption on the specified paused partitions respectively in the future poll(long) calls.

关于java - 在java中限制kafka消费者消息的正确方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70884302/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com