gpt4 book ai didi

java - kafka消费者在java中不接收消息

转载 作者:行者123 更新时间:2023-11-30 07:04:43 27 4
gpt4 key购买 nike

在我的下面的代码中,消费者订阅了现有主题,但没有收到来自主题的消息,请帮助我它等待消息,而在 kafka 控制台中消费者消息正确接收

package kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class SimpleConsumer {
public static void main(String[] args) throws Exception {

//Kafka consumer configuration settings
String topicName = "test12";
Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);

//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName));

//print the topic name
System.out.println("Subscribed to topic " + topicName);
int i = 0;

while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)

// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}

最佳答案

offset有可能是最新的位置。您可以尝试从头开始查找。

        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
for (TopicPartition partition : partitions) {
long offset = consumer.position(partition);
System.out.println(partition.partition() + ": " + offset);
}
consumer.seekToBeginning(partitions);

关于java - kafka消费者在java中不接收消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40337708/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com