gpt4 book ai didi

java - Kafka消费者输出的数据不一致

转载 作者:行者123 更新时间:2023-12-02 11:20:08 25 4
gpt4 key购买 nike

我需要从 Kafka 消费者中提取数据以将其传递到我的应用程序。下面是我编写的用于访问消费者的代码:

public class ConsumerGroup {
public static void main(String[] args) throws Exception {

String topic = "kafka_topic";
String group = "0";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic: " + topic);

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}

当我运行此代码时,有时会生成数据,有时则不会生成数据。为什么这种行为不一致?我的代码有问题吗?

最佳答案

你的代码没问题。您启用了自动提交选项,因此在读取记录后,它们会自动提交到 Kafka。每次运行代码时,都会从最后处理的偏移量开始,该偏移量存储在 __consumer_offsets 主题中。因此,您始终只读取上次运行后到达 Kafka 的新记录。要在消费者应用程序中不断打印数据,您应该不断地将新记录放入您的主题中。

关于java - Kafka消费者输出的数据不一致,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49996811/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com