gpt4 book ai didi

java - kafka 消费者 API consumer.poll() 不能正常工作,没有异常,只是阻塞

转载 作者:行者123 更新时间:2023-12-01 14:34:32 24 4
gpt4 key购买 nike

我正在按照 Apache kafka 文档学习 kafka。我用默认配置启动它。

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties &

我运行了 kafka-console-producer.sh 和 kafka-console-consumer.sh 来生产和消费消息,成功了。我用生产者API写了一段java代码来生产消息,没问题。这是由 kafka-console-consumer.sh 验证的。代码与 Apache Kafka 指南相同:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

producer.close();

尽管生产者代码有效,但消费者代码不起作用。没有异常,但它只是在 consumer.poll(100) 处阻塞。代码来自 Apache Kafka 文档:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

顺便说一下,Apache Kafka 文档中的 kafka-console-consumer.sh 示例成功消费消息,该消息由生产者生成到主题“test”:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

但是如果我没有连接到zookeeper,而是直接连接到kafka代理,那么它也不会正常工作,没有异常(exception),它只是阻塞。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

kafka 版本和 API 版本为 0.11.0.0

为什么他们不能消费消息?

最佳答案

使用 --zookeeper参数意味着使用旧的消费者,它运行良好,因为您指定了一个 Zookeeper 服务器( localhost:2181 )。

当你想指定一个 Kafka 代理(所以使用新的消费者)时,你必须使用 --bootstrap-server选项:您仍在使用 --zookeeper但传递有效的 Kafka 代理地址 ( localhost:9092 )。

因此,对于控制台使用者应用程序,您的配置需要是 --bootstrap-server localhost:9092而不是 --zookeeper localhost:9092 .

关于您的代码,您确定 poll 方法被阻止了吗?如果没有记录但没有阻塞,它应该在 100 毫秒(您指定的超时)后退出。

然后从你的代码中我看到生产者正在发送到“my-topic”,消费者订阅了“foo”和“bar”;最后,控制台消费者从“测试”中读取。都是不同的话题!

关于java - kafka 消费者 API consumer.poll() 不能正常工作,没有异常,只是阻塞,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44995848/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com