gpt4 book ai didi

java - 卡夫卡 : consume all messages on demand

转载 作者:行者123 更新时间:2023-12-01 19:44:51 26 4
gpt4 key购买 nike

目标:读取主题中的所有消息,然后终止进程。

我能够连续阅读以下消息:

props.put("bootstrap.servers", kafkaBootstrapSrv);
props.put("group.id", group_id);
props.put("max.poll.records", 1); // Only get one record at a time. I understand that to read all messages this will need to be increased
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("MY_TOPIC"));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);

for (ConsumerRecord<String, String> record : records) {
process_record(record);
}

consumer.commitSync();
}

但在这种情况下,进程永远不会终止。当我摆脱

while (true)

循环并运行程序,它不会从主题中获取记录(我希望有一条记录)。这是为什么?

最佳答案

Kafka 主题基本上实现了无限的事件流。

那么当从一个主题中消费时什么时候停止呢?你怎么知道你已经到达终点了?简短的回答是你不知道!理论上,生产者总是可以向主题发送新消息。

实际上,假设没有/很少有新记录被附加,您可以采取一些措施在最后停止。

使用endOffsets()您可以找到分区当前的最后偏移量。一旦消费者达到了分配给它的所有分区的偏移量,您就可以停止轮询(或刷新它并查看是否已发送新消息)。

您可以使用 position() 检索每个分区中的当前位置。方法。当使用时,每条记录还通过offset()公开其自己的偏移量。 。因此,您可以使用它们来跟踪最终偏移量的进度。

关于关于 poll() 第一次调用时不返回任何内容的第二个问题。这是预期的,因为基本上 poll() 使客户端工作,并且在第一次调用时,它将启动与集群的连接并启动组协议(protocol)(需要几秒钟),因此不太可能发送消息在 poll() 返回之前就已经收到了。

关于java - 卡夫卡 : consume all messages on demand,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53766985/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com