gpt4 book ai didi

java - 根据时间戳检索kafka主题中的偏移量

转载 作者:行者123 更新时间:2023-12-02 03:50:14 25 4
gpt4 key购买 nike

我尝试根据 timetsamp 检索偏移量,但是当我运行代码时,它在此处抛出空指针错误“LongeekOffset = outOffsets.get(partition).offset();”因为特定时间没有偏移量我的问题是如何获得该特定时间戳上最接近的偏移量

下面的代码我尝试过,


Long startTimestamp=Instant.now().minus (10, ChronoUnit.MINUTES ).toEpochMilli();

Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (TopicPartition partition : partitions) {
timestampsToSearch.put(partition, startTimestamp);
}
Map<TopicPartition, OffsetAndTimestamp> outOffsets = consumer.offsetsForTimes(timestampsToSearch);
for (TopicPartition partition : partitions) {
Long seekOffset = outOffsets.get(partition).offset();
consumer.seek(partition, seekOffset);

任何帮助将不胜感激!提前致谢

最佳答案

offsetsForTimes 方法返回时间戳大于或等于目标时间戳的第一条消息的偏移量。如果为空则没有这样的消息。在这种情况下,只需将消费者定位到最后即可。

for (TopicPartition partition : partitions) {
OffsetAndTimestamp seekOffset = outOffsets.get(partition);
if(seekOffset!=null){
consumer.seek(partition, seekOffset.offset());
}else{
consumer.seekToEnd(Collections.singleton(partition));
}
}

关于java - 根据时间戳检索kafka主题中的偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56783876/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com