gpt4 book ai didi

java - 如何在 Java 中使用来自 Kafka 的消息,从特定偏移量开始

转载 作者:行者123 更新时间:2023-12-04 10:32:00 25 4
gpt4 key购买 nike

从最早阅读:

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

从最新阅读:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

但是如果我想开始,我应该使用哪一行:从第 18 次提交开始?

最佳答案

您可以使用 seek() 为了强制消费者从特定的偏移量开始消费:

public void seek(TopicPartition partition, long offset)

Overrides the fetch offsets that the consumer will use on the next poll(timeout). If this API is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets



例如,假设您想从偏移量 18 开始:
TopicPartition tp = new TopicPartition("myTopic", 0);
Long startOffset = 18L

List<TopicPartition> topics = Arrays.asList(tp);
consumer.assign(topics);
consumer.seek(topicPartition, startOffset);

// Then consume messages as normal..

关于java - 如何在 Java 中使用来自 Kafka 的消息,从特定偏移量开始,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60375581/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com