gpt4 book ai didi

java - Kafka Consumer仅在产生 'enough'数据后才读取

转载 作者:太空宇宙 更新时间:2023-11-04 09:39:01 25 4
gpt4 key购买 nike

我正在 spring-boot 中实现一个端点,当调用该端点时,它将转储 kafka 主题中的所有消息(用于测试)。

我期望的行为是,当生产者写入“testTopic”主题,随后消费者进行轮询时,它应该读取刚刚生成的消息。

我观察到的行为是消费者无法消费生成的消息。此外,如果生产者生成了更多消息(例如 10-15 条),那么消费者将一次性转储所有消息。从这一点开始,如果生产者生产一条消息,那么消费者就会按预期消费。

直觉上我认为设置 FETCH_MIN_BYTES_CONFIG 可能与此有关 - 也许消费者正在等待写入足够的字节。但这已经设置为 1 字节(默认值),并且不能解释后续成功的单个消息读取。

接下来我想也许我在创建主题之前注册了消费者(通过太快地调用注册端点)。但我在注册消费者之前从 kafka-topics.sh 确认该主题存在。

我注意到,如果我启用偏移量自动提交,那么行为有时会符合预期,有时则不会。通过手动提交偏移量(下面的代码中未显示),如上所述,行为非常奇怪。

我还通过使用 kafka-console-consumer 确认生产者正在按预期工作。

还尝试将轮询超时增加到 1 秒,但没有成功。

// Consumer
@Component
public class TestConsumer{
private KafkaConsumer testConsumer = null;

public void registerConsumer(final String consumerId) {
if (consumer == null) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<some_address>:<some_port>");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");

testConsumer = new KafkaConsumer<String, String>(props);
testConsumer.subscribe(Collections.singletonList("testTopic"));
}
else{
logger.debug("Consumer already registered");
}
}

public Map<String, List<String>> consume() {
Map<String, List<String>> messages = new HashMap<>();
if (testConsumer == null){
logger.error("testConsumer was not instantiated");
return null;
}
ConsumerRecords<String, String> records = testConsumer.poll(Duration.ofMillis(100));
List<String> buffer = new ArrayList<>();
for (ConsumerRecord<String, String> record: records){
logger.debug(String.format("Consuming %s", record.value()));
buffer.add(record.value());
}
messages.put("data", buffer);
return messages;
}
}

步骤顺序是:1. Spring Boot应用程序启动2. kafka主题已创建,我可以通过kafka控制台确认3.我注册生产者和消费者4. Producer生产,我可以通过kafka控制台确认这一点(不同的消费者组)5.消费者消费失败

我期望结果如下:

{
"data" : ["message1"]
}

我得到的是

{
"data" : []
}

知道为什么消费者在写入阈值数量的消息之前不消费记录吗?

EDIT_1:向消费者添加了 props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 属性,但没有效果。

最佳答案

当您手动调用此testConsumer.poll(Duration.ofMillis(100))时。你需要不断地从主题中汲取灵感。就像在无限 while 循环中一样。例如:

while (true) {
Map records = consume();
logger.debug("received records: {}", records);
}

看看这个链接:Kafka consumer

关于java - Kafka Consumer仅在产生 'enough'数据后才读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56178403/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com