gpt4 book ai didi

scala - 如何解决Kafka Consumer轮询超时错误

转载 作者:行者123 更新时间:2023-12-05 06:25:59 25 4
gpt4 key购买 nike

我正在尝试通过 Vagrant 机器使用 Apache Kafka 来运行一个简单的 Kafka 消费者程序。当程序尝试调用 .poll(100) 方法时,它在 for 循环之前卡住了。

很多人为了调试而深入挖掘更深层次的类,但发现的并不多。

val TOPIC="testTopic"

val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.10:9092")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

val consumer = new KafkaConsumer[String, String](props)

consumer.subscribe(util.Collections.singletonList(TOPIC))

while(true) {
println("Test")
val records = consumer.poll(100)
for (record <- records.asScala) {
println(record)
}
println("Test2")
}

当前输出测试,然后卡住了,没有错误消息。预计会输出Kafka主题的内容。

最佳答案

您需要升级您的kafka-clients 版本到2.0.0 或更高版本。当 kafka 服务器关闭时,例如,使用 KafkaConsumer 类中的 poll 方法,您将陷入内部循环,等待代理再次可用。

根据 KIP-266 :

ConsumerRecords

poll​(long timeout)

Deprecated. Since 2.0. Use poll(Duration), which does not block beyond the timeout awaiting partition assignment. See KIP-266 for more information.

在你的情况下:

import org.apache.kafka.clients.consumer.KafkaConsumer; 
import scala.concurrent.duration._

// ...
val timeout = Duration(100, MILLISECONDS)

while(true) {
println("Test")
val records = consumer.poll(timeout)
for (record <- records.asScala) {
println(record)
}
println("Test2")
}

//...

总之,您只需要导入新版本的KafkaConsumer 类,并将超时参数作为Duration 对象的实例传递给新的poll 方法。

关于scala - 如何解决Kafka Consumer轮询超时错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56534248/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com