gpt4 book ai didi

java - 多个消费者引发的Kafka broker内存泄漏

转载 作者:搜寻专家 更新时间:2023-11-01 03:32:48 27 4
gpt4 key购买 nike

我正在构建一个 Java 8 应用程序,它查询 Kafka 主题以获取一条消息。每个请求都会创建一个新的 Consumer 对象(独立于任何现有的 Consumer 对象),该对象轮询我的 Kafka 主题,获取一条记录,而 Consumer 是关闭。这种情况每天发生约 20 万次,并且每个请求都独立于所有其他请求,因此我认为我无法重用消费者。基本上,用户从主题请求一条消息,并为他们创建一个消费者,然后关闭。这种情况平均每秒发生约 2 次,但它是任意的,因此它可能发生 10 次/秒或 1 次/小时,无法知道。

一段时间后,Kafka 服务器(不是运行代码的服务器,而是运行 Kafka 的实际服务器)上的堆大小变得巨大,垃圾收集无法清除它。最终,更多的 CPU 时间专门用于 GC,并且一切都崩溃了,直到我重新启动 Kafka。

这是导致问题的代码的近似版本,while(true) 近似真实行为(在生产中,消费者不是在 while 循环中创建的,而是在-当用户请求来自主题的消息时的需求):

Properties props = new Properties();
props.put("bootstrap.servers", "SERVER_IP:9092");
props.put("session.timeout.ms", 30000);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 1000);

while(true){
Consumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition tp = new TopicPartition("TOPIC", 0);
consumer.assign(Arrays.asList(tp));
consumer.seekToEnd(Arrays.asList(tp));

// I've narrowed down the memory leak to this line
ConsumerRecords<String, String> cr = consumer.poll(1000);
// If I remove this line ^, the memory leak does not happen

/* CODE TO GET ONE RECORD */

consumer.unsubscribe();
consumer.close();
}

在 20 个 JVM 上运行此代码会在大约 20 分钟内导致内存泄漏。 Kafka 服务器上的堆(蓝色)和 GC 暂停时间(绿色)如下所示: KafkaMemoryLeak

我是不是做错了什么(或者是否有更好的方法来解决这个问题),或者当创建和关闭大量消费者时,这是 Kafka 中的错误吗?

我在客户端运行 Kafka 0.10.2.1,在服务器端运行 Kafka 0.10.2.0。

最佳答案

无论您收到请求的数量和频率如何,您仍然可以重用 KafkaConsumer 实例。您只能在请求到达时进行轮询,但不需要每次都创建和关闭消费者。

话虽如此,如果内存使用量增加并且 GC 未回收,您对消费者的使用可能会揭示代理上的内存管理问题。当生产者被非常频繁地回收时,我已经看到报告代理用完直接内存的问题。所以很可能那里有改进的余地。可能最好在 issues.apache.org 上提交一张票以进行查看。

关于java - 多个消费者引发的Kafka broker内存泄漏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44184795/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com