gpt4 book ai didi

java - KafkaConsumer 在轮询时进入无限期等待状态

转载 作者:行者123 更新时间:2023-12-01 17:50:45 25 4
gpt4 key购买 nike

我正在尝试使用 KafkaConsumer API 轮询 kafka 主题。但即使我们通过轮询超时,它也会进入无限期的等待状态,并且不会兑现。

从线程转储中,它显示可运行状态,我进行了多个线程转储,主线程始终保持在同一位置,我相信它不会退出等待。

"main" #1 prio=5 os_prio=0 tid=0x00007f42a800f000 nid=0x59 runnable [0x00007f42b0782000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000006c02e2088> (a sun.nio.ch.Util$2)
- locked <0x00000006c02e2078> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000006c02e1f60> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:425)
at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:126)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:186)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:857)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829)
at org.test.TestReceiver(TestReceiver:100)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

这是代码片段......仅打印第一个日志。

LOG.info("Going to wait {}ms for ConsumerRecords", POLLING_TIMEOUT_MILLIS);
ConsumerRecords<String, String> records = consumer.poll(POLLING_TIMEOUT_MILLIS);
LOG.info("Received {} ConsumerRecords to process.", (records != null ? records.count() : null));

库版本...
kafka_2.11:jar:0.9.0.0
kafka-clients:jar:0.9.0.0

最佳答案

KafkaConsumer#poll() 如果需要刷新其元数据但无法连接到集群,则可能会阻塞。

通过 KIP-266 寻址:https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886

关于java - KafkaConsumer 在轮询时进入无限期等待状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50392782/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com