gpt4 book ai didi

java - 有没有办法从Kafka主题中获取最后一条消息?

转载 作者:行者123 更新时间:2023-12-04 01:03:20 25 4
gpt4 key购买 nike

我有一个带有多个分区的 Kafka 主题,我想知道 Java 中是否有办法获取该主题的最后一条消息。我不关心我只想获取最新消息的分区。

我试过 @KafkaListener但它仅在主题更新时获取消息。如果在应用程序打开后没有发布任何内容,则不会返回任何内容。

也许听众根本就不是解决问题的正确方法?

最佳答案

以下代码段对我有用。你可以试试这个。评论中的解释。

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));

consumer.poll(Duration.ofSeconds(10));

consumer.assignment().forEach(System.out::println);

AtomicLong maxTimestamp = new AtomicLong();
AtomicReference<ConsumerRecord<String, String>> latestRecord = new AtomicReference<>();

// get the last offsets for each partition
consumer.endOffsets(consumer.assignment()).forEach((topicPartition, offset) -> {
System.out.println("offset: "+offset);

// seek to the last offset of each partition
consumer.seek(topicPartition, (offset==0) ? offset:offset - 1);

// poll to get the last record in each partition
consumer.poll(Duration.ofSeconds(10)).forEach(record -> {

// the latest record in the 'topic' is the one with the highest timestamp
if (record.timestamp() > maxTimestamp.get()) {
maxTimestamp.set(record.timestamp());
latestRecord.set(record);
}
});
});
System.out.println(latestRecord.get());

关于java - 有没有办法从Kafka主题中获取最后一条消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57722688/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com