gpt4 book ai didi

java - Spring Kafka - 为任何主题的分区消耗最后 N 条消息

转载 作者:行者123 更新时间:2023-12-02 01:26:22 25 4
gpt4 key购买 nike

我正在尝试读取请求的 kafka 消息数。对于非事务性消息,我们将从 endoffset - N(对于 M 个分区)开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息。对于幂等/事务消息,我们必须考虑事务标记/重复消息,这意味着偏移量将不连续,在这种情况下,endoffset - N 将不会返回 N 条消息,我们需要返回并寻找更多消息,直到我们有 N 条消息对于每个分区或达到开始偏移

由于有多个分区,我需要跟踪所有读取的偏移量,以便在所有操作完成后可以停止。有两个步骤,第一步计算起始偏移量(结束偏移量 - 请求的消息数)和结束偏移量。 (偏移量不连续,有间隙),我会寻找分区从起始偏移量开始消耗。第二步是轮询消息并对每个分区中的消息进行计数,如果我们不满足请求的消息数量,则再次重复第一步和第二步,直到满足每个分区的消息数量。

条件

初始轮询可能不会返回任何记录,因此请继续轮询。当达到每个分区的结束偏移量或轮询不返回结果时停止轮询。检查每个分区读取的消息是否与请求的消息相同。如果是,则标记为完成,如果否,则标记为继续并重复步骤。考虑消息中的间隙。应该适用于事务性和非事务性生产者。

问题:

我将如何跟踪每个分区已读取的所有消息并跳出循环?如果有帮助,每个分区中的消息将按顺序出现。

spring kafka支持这样的用例吗?更多详情可查看here

更新:我要求读取每个分区中的最后 N 条消息。分区和消息数是用户输入的。我想将所有偏移管理保留在内存中。本质上,我们试图按 LIFO 顺序读取消息。这使得它变得棘手,因为卡夫卡允许你向前阅读而不是向后阅读。

最佳答案

为什么有这样的需要,我不明白。当队列中没有任何内容时,Kafka 本身会进行管理。如果消息从一个状态跳转到另一个状态,则可以有单独的队列/主题。不过,这里是如何做到这一点的。

当我们使用类似 - 的方式消费来自分区的消息时 -

ConsumerIterator<byte[], byte[]> it = something; //initialize consumer
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
String kafkaMessage = new String(messageAndMetadata.message());
int partition = messageAndMetadata.partition();
long offset = messageAndMetadata.offset();
boolean processed = false;
do{
long maxOffset = something; //fetch from db
//if offset<maxOffset, then process messages and manual commit
//else busy wait or something more useful
}while(processed);
}

我们获取有关偏移量、分区号和消息本身的信息。您可以选择使用此信息执行任何操作。

对于您的用例,您可能还决定将消耗的偏移量保存到数据库中,以便下次可以调整偏移量。另外,我建议关闭连接进行清理,并最终将处理后的偏移量保存到数据库。

关于java - Spring Kafka - 为任何主题的分区消耗最后 N 条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58339639/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com