gpt4 book ai didi

java - 永远的卡夫卡消费者民意调查

转载 作者:行者123 更新时间:2023-11-30 02:03:27 27 4
gpt4 key购买 nike

我想弄清楚为什么 consumer.poll 在我的测试中永远挂起。

在 Debug模式下,消费者似乎无法在无限的 while 循环中找到组协调器。

我的测试代码:

final String BROKER_PORT = "9092";
final String HOST = "localhost";
final String BOOTSTRAP_SERVERS = HOST + ":" + BROKER_PORT;
final String ZK_PORT = "2181";
final Integer ZK_PORT_INT = Integer.valueOf(ZK_PORT);
final String ZK_HOST = HOST + ":" + ZK_PORT;

final String topic = "test-topic-10";

//start zookeeper
String path = new File(".").getCanonicalPath();
zookeeper = new TestingServer(ZK_PORT_INT, new File(path));
Thread.sleep(5_000);

//start broker
final File logDirectory = Files.createTempDir();
logDirectory.deleteOnExit();
final Properties p = new Properties();
p.put("zookeeper.connect", zookeeper.getConnectString());
p.put("broker.id", "1");
p.put("num.partitions", "1");
p.put("host.name", HOST);
p.put("port", BROKER_PORT);
p.put("log.dir", logDirectory.getAbsolutePath());
p.put("auto.create.topics.enable", "true");
p.put("delete.topic.enable", "true");
p.put("log.cleaner.dedupe.buffer.size", 2 * 1024 * 1024L + "");
new KafkaServerStartable(new KafkaConfig(p)).startup();

//send one record with producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>(topic, "key", "val"));

// Try to poll record with consumer
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
System.out.println("POLL!");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofDays(1));
System.out.println(records);

最佳答案

将以下属性添加到您的消费者并重试

 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

关于java - 永远的卡夫卡消费者民意调查,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52020601/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com