gpt4 book ai didi

java - Kafka命令行consumer读取,但无法通过Java读取

转载 作者:行者123 更新时间:2023-12-01 19:44:53 25 4
gpt4 key购买 nike

我使用此命令手动创建了主题test:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

并使用此命令:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

我插入了这些记录:

This is a message
This is another message
This is a message2

首先,我通过命令行使用消息,如下所示:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

所有记录均已成功显示。然后,我尝试使用以下代码在 Java 中实现消费者:

public class KafkaSubscriber {

public void consume() {

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
// also with this command
// consumer.subscribe(Arrays.asList("test"));

System.out.println("Starting to read data...");

try {
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.println("Number of records found: " + records.count());
for (ConsumerRecord rec : records) {
System.out.println(rec.value());
}
}
catch (Exception ex) {
ex.printStackTrace();
}
}
}
catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}

但是输出是:

Starting to read data...
0
0
0
0
0
....

这意味着它在主题test中没有找到任何记录。我还尝试在 Java 消费者启动后发布一些记录,但还是一样。知道可能出了什么问题吗?

<小时/>

编辑:添加以下行后:

 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

消费者现在仅当我向该主题写入新记录时才读取。它不会从头开始读取所有记录。

最佳答案

默认情况下,如果之前没有为该组提交任何偏移量,则消费者从末尾主题开始。

因此,如果您在生成记录后运行它,它将不会收到它们。

请注意,在您的kafka-console-consumer.sh中,您有--from-beginning标志,它强制消费者从主题的开头开始.

正如评论中所建议的,一种解决方法是将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 设置为最早。不过,我会谨慎对待该设置,因为您的消费者将从主题开始就进行消费,并且在实际用例中这可能会包含大量数据。

最简单的解决方案是,现在您已经运行了一次消费者并且它创建了一个组,您只需重新运行生产者即可。之后,当您再次运行消费者时,它将从新生产者消息之前的最后一个位置开始。

另一方面,如果您打算始终重新使用所有消息,那么您有 2 个选择:

  • 当您的消费者开始将其位置移至主题开头时,显式使用 seekToBeginning()

  • auto.offset.reset 设置为最早,并通过将 enable.auto.commit 设置为 禁用自动偏移提交>假

关于java - Kafka命令行consumer读取,但无法通过Java读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53757404/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com