gpt4 book ai didi

java - 即使使用 auto.offset.reset,我的 Java Consumer 也无法从 Broker 读取消息 - 最早

转载 作者:行者123 更新时间:2023-11-30 05:32:45 27 4
gpt4 key购买 nike

我的 Broker 中有一个名为“test”的主题。我用 CLI 检查过。

我创建了一个 java 生产者来将消息发送到主题 test。我可以从 CLI 中使用它们。

.\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning (我在 Windows 上运行它)

但是,当我在 Java Consumer 程序中运行它时,即使我将 auto.offset.reset 设置为 earliest,它也不会消耗任何消息。我究竟做错了什么?

public class Consumer1 {

public static void main(String[] args) {

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "jin");
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

//consumer.subscribe(Collections.singletonList("test"));
consumer.subscribe(Arrays.asList("test"));

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
//ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
//consumer.commitAsync();
}
} catch (Exception e){
e.printStackTrace();
} finally {
consumer.close();
System.out.println("closed");
}
}
}

最佳答案

auto.offset.reset如果它是一个全新的消费者组,或者消费者组偏移被删除,则该属性将会出现。它不适用于已经在 Kafka 中存储了偏移量的消费者组

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

关于java - 即使使用 auto.offset.reset,我的 Java Consumer 也无法从 Broker 读取消息 - 最早,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57212222/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com