gpt4 book ai didi

java - Kafka consumer.poll 不返回任何记录

转载 作者:行者123 更新时间:2023-11-30 10:04:17 29 4
gpt4 key购买 nike

当我用新组 ID 注册消费者时,前 N 次轮询调用没有返回任何结果。

我想测试一下,当我调用服务时,会发布一个 Kafka 事件。问题是每当我更改 groupId 时,前 N 个民意调查都不会返回任何内容。我了解 Kafka 在轮询时首先注册消费者,但我发现注册消费者所需的轮询次数(时间)过于随机。

消费者配置:

Properties props = new Properties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_URL);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

KafkaConsumer<S, T> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));

步骤:

  1. 在每次测试之前,我都会调用 consumer.poll(Duration.ofSeconds(5)) 以确保消费者已注册并设置了偏移量。
  2. 我调用该服务并对响应进行断言。如果我使用 UI 检查 Kafka,事件就会发布。
  3. 我调用 consumer.poll(Duration.ofSeconds(5)) 并希望收到一些记录。 这是失败的步骤

有没有办法确保第二次轮询总是返回记录?我试着让第一个投票持续 1 分钟(我已经认为 5 秒对于等待每个测试来说太长了),它仍然有时有效,有时无效。

谢谢。

最佳答案

它不适用于您的“新 groupId”的原因是您处于“最新”模式。

默认值为“最新”,您需要处于“最早”模式或第一次使用您的“新组 ID”进行轮询或为该主题提交此“新组 ID”的偏移量。

您需要将“groupId”注册到主题,而不是消费者。

关于java - Kafka consumer.poll 不返回任何记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55949910/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com