- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我正在构建一个 Java 8 应用程序,它查询 Kafka 主题以获取一条消息。每个请求都会创建一个新的 Consumer
对象(独立于任何现有的 Consumer
对象),该对象轮询我的 Kafka 主题,获取一条记录,而 Consumer
是关闭。这种情况每天发生约 20 万次,并且每个请求都独立于所有其他请求,因此我认为我无法重用消费者。基本上,用户从主题请求一条消息,并为他们创建一个消费者,然后关闭。这种情况平均每秒发生约 2 次,但它是任意的,因此它可能发生 10 次/秒或 1 次/小时,无法知道。
一段时间后,Kafka 服务器(不是运行代码的服务器,而是运行 Kafka 的实际服务器)上的堆大小变得巨大,垃圾收集无法清除它。最终,更多的 CPU 时间专门用于 GC,并且一切都崩溃了,直到我重新启动 Kafka。
这是导致问题的代码的近似版本,while(true)
近似真实行为(在生产中,消费者不是在 while 循环中创建的,而是在-当用户请求来自主题的消息时的需求):
Properties props = new Properties();
props.put("bootstrap.servers", "SERVER_IP:9092");
props.put("session.timeout.ms", 30000);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 1000);
while(true){
Consumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition tp = new TopicPartition("TOPIC", 0);
consumer.assign(Arrays.asList(tp));
consumer.seekToEnd(Arrays.asList(tp));
// I've narrowed down the memory leak to this line
ConsumerRecords<String, String> cr = consumer.poll(1000);
// If I remove this line ^, the memory leak does not happen
/* CODE TO GET ONE RECORD */
consumer.unsubscribe();
consumer.close();
}
在 20 个 JVM 上运行此代码会在大约 20 分钟内导致内存泄漏。 Kafka 服务器上的堆(蓝色)和 GC 暂停时间(绿色)如下所示:
我是不是做错了什么(或者是否有更好的方法来解决这个问题),或者当创建和关闭大量消费者时,这是 Kafka 中的错误吗?
我在客户端运行 Kafka 0.10.2.1,在服务器端运行 Kafka 0.10.2.0。
最佳答案
无论您收到请求的数量和频率如何,您仍然可以重用 KafkaConsumer 实例。您只能在请求到达时进行轮询,但不需要每次都创建和关闭消费者。
话虽如此,如果内存使用量增加并且 GC 未回收,您对消费者的使用可能会揭示代理上的内存管理问题。当生产者被非常频繁地回收时,我已经看到报告代理用完直接内存的问题。所以很可能那里有改进的余地。可能最好在 issues.apache.org 上提交一张票以进行查看。
关于java - 多个消费者引发的Kafka broker内存泄漏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44184795/
我正在使用 SharePoint Online 并使用 Windows Azure 托管访问 SPO 的进程。 我们已将启动任务添加到 Azure 角色以安装 http://www.microsoft
我有一个函数,它获取包含时间的源文件(csv 文件),读取它,然后按顺序对行进行排序并将它们写入目标文件中。但是,如果源 csv 文件不存在,我需要引发 FileNotFoundError。我之前曾引
我试图在目录不存在时引发错误,然后再打开该目录中的文件。根据this response我应该为我的问题使用最具体的异常构造函数,我认为它是 NotADirectoryError。但是运行下面的代码我得
在编码/开发生命的一天或另一天,我们确实遇到了这个特殊的情况,这是最常见的异常(exception)之一。我的问题是关于的而不是。为什么(我知道当我们尝试访问实际上指向null的引用变量的属性时会引发
我想知道在 python 中是否可以在一个 except block 中引发异常并在稍后的 except block 中捕获它。我相信其他一些语言默认会这样做。 这是它的样子" try: som
我有以下代码: br = mechanize.Browser() br._factory.is_html = True br.form = mechanize._form.ParseString(''
我刚刚发现,如果您有一个引发 TOO_MANY_ROWS 异常的 SELECT INTO,该变量仍会从查询检索到的第一条记录中分配值。这是预期的行为吗? 这是我的例子: for co in my_cu
当 SSH 显示 WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED! 我知道当您重新安装远程服务器时会发生这种情况,但我尝试列出 其他原因 . 我知道如何
我有一个枚举和一个 EnumMap . 我将 map 放入一个类中以隐藏“字节”值。所以我有一个set(Parameter, int)和set(Parameter, boolean)方法。 publi
在什么情况下会redis-py引发以下 AttributeError 异常? redis-py 不是设计来引发仅基于 redis.exceptions.RedisError 的异常吗? 什么是合理的处
可悲的是,对此异常的引用通常具有异国情调,并且可能发生在您例如通过 Assembly.GetTypes() 枚举类型- 举个例子,它发生在我们的一个部署上,但同一组程序集在集成服务器上运行良好。 为了
我正在为 Android 下的特定平板电脑克隆一个存储库并获取源代码,我必须执行一个 python 脚本。当我执行它时,我收到此错误消息: Traceback (most recent call la
首先,执行此操作(在运行 4.4.2 的 Nexus 5 上测试): 将 PRIORITY_LOW 通知传递给 Service.startForeground()。 观察通知不显示在状态栏中。 使用相
我尝试使用 AppEngine 的 python 模块 api 来获取使用基本缩放的模块的实例数。在我模块的 yaml 文件中,我明确设置了 max_instances 参数。我希望 get_num_
当我如下运行我的 spark python 代码时: import pyspark conf = (pyspark.SparkConf() .setMaster("local")
在我的系统上,一段适用于 Python 2 的代码不适用于 Python 3。 f = open("plotwidget.svg") svgData = f.read() xml_stream = Q
我是 PHP 和 SQL 的新手,但我正在创建一个登录系统。我遇到的问题是: You have an error in your SQL syntax; check the manual that c
我有一个使用 ebaysdk 库的 python 代码,当我运行代码并输入关键字进行搜索时,我得到了这个错误。 Traceback (most recent call last): File "eba
当我将表单数据发送到我的 Flask 应用程序时,出现以下错误。它说它将使用 UTF-8 编码,但语言环境已经是 UTF-8。这个错误是什么意思? /home/.virtualenvs/project
在python2.7中,跟随pympler example : from anotherfile import somefunction, somecustomclass from os import
我是一名优秀的程序员,十分优秀!