gpt4 book ai didi

apache-kafka - Kafka消费者轮询和重新连接

转载 作者:行者123 更新时间:2023-12-04 17:58:11 25 4
gpt4 key购买 nike

我们刚刚开始在我们的项目中使用 Kafka。我们正在使用 kafka_2.11-0.9.0.0。我有一些与 KafkaConsumer 相关的问题。

1) 我在启动 Zookeeper 和 Kafka 服务器之前启动了 Kafka Consumer,但我的 KafkaConsumer 客户端仍然能够连接。我有以下代码行

    Consumer<String, String> consumer =  new KafkaConsumer<String,String>(props);
consumer.subscribe(getConsumerRegisteredTopics());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records){
processRecord (record)
}
}

2) 我读到,Zookeeper 通过使用 poll(long timeout) 方法调用来跟踪事件的 Consumer。如果我使用 Long.MAX_VALUE 在 poll() 中超时,zookeeper 将如何跟踪我的消费者。你能帮我理解 KafkaConsumer 轮询调用的行为吗。

提前致谢。

最佳答案

1) 如果您在启动消费者之前没有启动 zookeeper 和 kafka,它无法连接,但会尝试从 kafka 读取元数据。我的经验是,KafkaConsumer 的“轮询”调用将不确定地阻塞,直到它能够连接和读取元数据。换句话说……您的消费者实际上并没有连接,而是在等待 kafka 集群出现。

2) 轮询超时告诉消费者等待多长时间才能返回任何数据。您必须确保在轮询返回后尽快再次调用轮询以使您的消费者保持活跃。给轮询调用的超时与 KafkaConsumer 的保持事件机制无关(这由您的消费者的消费者属性的 session.timeout.ms 属性控制).

关于apache-kafka - Kafka消费者轮询和重新连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38597037/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com