gpt4 book ai didi

java - Kafka - 使用高级消费者实现延迟队列

转载 作者:太空狗 更新时间:2023-10-29 22:34:21 25 4
gpt4 key购买 nike

想要使用高级消费者 api 实现延迟消费者

主要思想:

  • 按 key 生成消息(每个消息包含创建时间戳)这确保每个分区都按生成时间对消息进行排序。
  • auto.commit.enable=false(将在每个消息处理后显式提交)
  • 消费一条消息
  • 检查消息时间戳并检查是否已经过了足够的时间
  • 处理消息(这个操作永远不会失败)
  • 提交 1 个偏移量

    while (it.hasNext()) {
    val msg = it.next().message()
    //checks timestamp in msg to see delay period exceeded
    while (!delayedPeriodPassed(msg)) {
    waitSomeTime() //Thread.sleep or something....
    }
    //certain that the msg was delayed and can now be handled
    Try { process(msg) } //the msg process will never fail the consumer
    consumer.commitOffsets //commit each msg
    }

关于此实现的一些问题:

  1. 提交每个偏移量可能会减慢 ZK 速度
  2. consumer.commitOffsets 可以抛出异常吗?如果是,我将使用相同的消息两次(可以使用幂等消息解决)
  3. 等待很长时间没有提交偏移量的问题,例如延迟时间为 24 小时,将从迭代器获取下一个, hibernate 24 小时,处理并提交(ZK session 超时?)
  4. ZK session 如何在不提交新偏移量的情况下保持 Activity 状态? (设置一个hive zookeeper.session.timeout.ms可以解决死消费者不识别)
  5. 我还遗漏了其他问题吗?

谢谢!

最佳答案

解决此问题的一种方法是使用不同的主题,您可以在其中推送所有要延迟的消息。如果所有延迟的消息都应在相同的时间延迟后处理,这将是相当简单的:

while(it.hasNext()) {
val message = it.next().message()

if(shouldBeDelayed(message)) {
val delay = 24 hours
val delayTo = getCurrentTime() + delay
putMessageOnDelayedQueue(message, delay, delayTo)
}
else {
process(message)
}

consumer.commitOffset()
}

现在将尽快处理所有常规消息,而需要延迟的消息将放在另一个主题上。

好消息是我们知道延迟主题头部的消息是应该首先处理的消息,因为它的 delayTo 值将是最小的。因此,我们可以设置另一个读取头消息的消费者,检查时间戳是否在过去,如果是,则处理消息并提交偏移量。如果不是,它不会提交偏移量,而是一直睡到那个时间:

while(it.hasNext()) {
val delayedMessage = it.peek().message()
if(delayedMessage.delayTo < getCurrentTime()) {
val readMessage = it.next().message
process(readMessage.originalMessage)
consumer.commitOffset()
} else {
delayProcessingUntil(delayedMessage.delayTo)
}
}

如果有不同的延迟时间,您可以根据延迟对主题进行划分(例如 24 小时、12 小时、6 小时)。如果延迟时间比它更动态,它就会变得有点复杂。您可以通过引入两个延迟主题来解决它。读取延迟主题 A 的所有消息,并处理所有 delayTo 值为过去的消息。在其他主题中,您只需找到最接近 delayTo 的主题,然后将它们放在主题 B 上。 hibernate 直到最接近的一个应该被处理并以相反的方式执行所有操作,即处理来自主题 B 的消息并将尚未处理的一次放回主题 A .

回答您的具体问题(有些问题已在您问题的评论中解决)

  1. Commit each offset might slow ZK down

您可以考虑切换到在 Kafka 中存储偏移量(从 0.8.2 开始可用的功能,请查看消费者配置中的 offsets.storage 属性)

  1. Can consumer.commitOffsets throw an exception? if yes, I will consume the same message twice (can solve with idempotent messages)

我相信它可以,例如,如果它不能与偏移存储通信。正如您所说,使用幂等消息可以解决这个问题。

  1. Problem waiting long time without committing the offset, for example delay period is 24 hours, will get next from iterator, sleep for 24 hours, process and commit (ZK session timeout?)

除非消息本身的处理时间超过 session 超时时间,否则上述解决方案不会有问题。

  1. How can ZK session keep-alive without commit new offsets? (setting a hive zookeeper.session.timeout.ms can resolve in dead consumer without recognizing it)

再次说明,您不需要设置很长的 session 超时。

  1. Any other problems I'm missing?

总是有;)

关于java - Kafka - 使用高级消费者实现延迟队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31775003/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com