gpt4 book ai didi

java - 如何获取每个分区当前最新的偏移量,然后只消耗该偏移量?

转载 作者:太空宇宙 更新时间:2023-11-04 09:14:53 24 4
gpt4 key购买 nike

我正在尝试检查接收大量数据的主题中是否缺少键。由于该作业是按需运行的,因此它需要一些标准来知道它何时搜索过它关心的所有记录。我们确定这将是作业启动时每个分区的最新偏移量。

我的问题首先是如何获取主题的所有分区信息而不实际使用它(我需要使用它为每个分区创建单独的消费者,以跟踪其偏移量与最大偏移量)。

第二,如何在消费者看到它达到最大偏移量后停止它。

编辑:我找到了一种获取分区的方法,即为单个消费者订阅该主题,进行虚拟轮询,然后使用partitionsFor(...)。不确定这是否是“推荐”的方法。

最佳答案

您可以使用consumer.partitionsFor和consumer.endOffsets获取分区和最后的偏移量

分区

 /*Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it does  not already have any metadata about the given topic.*/ 
public java.util.List<PartitionInfo> partitionsFor(java.lang.String topic)

结束偏移

/*Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1*/
public java.util.Map<TopicPartition,java.lang.Long> endOffsets(java.util.Collection<TopicPartition> partitions)

.

下面是示例代码

Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerid");
Consumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProperties);
List<PartitionInfo> parts = consumer.partitionsFor(topic);
consumer.assign(partitions);
Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
for (TopicPartition tp : offsets.keySet()) {
OffsetAndMetadata commitOffset = consumer.committed(new
TopicPartition(tp.topic(), tp.partition()));
//Consumer offset for partition tp
long offset=offsets.get(tp);
//Consumed committed offset
long consumedOffset=commitOffset.offset();
}

关于java - 如何获取每个分区当前最新的偏移量,然后只消耗该偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59166910/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com