gpt4 book ai didi

java - 延迟 Kafka Streams 消费

转载 作者:太空宇宙 更新时间:2023-11-04 09:14:24 26 4
gpt4 key购买 nike

我正在尝试使用Kafka Streams(即不是简单的Kafka Consumer)从重试主题中读取之前无法处理的事件。我希望从重试主题中进行消费,如果处理仍然失败(例如,如果外部系统已关闭),我希望将事件放回到重试主题上。因此我不想立即继续消费,而是等待一段时间再消费,以免系统被暂时无法处理的消息淹没。

简单地说,代码当前执行此操作,我希望为其添加延迟。

fun createTopology(topic: String): Topology {
val streamsBuilder = StreamsBuilder()

streamsBuilder.stream<String, ArchivalData>(topic, Consumed.with(Serdes.String(), ArchivalDataSerde()))
.peek { key, msg -> logger.info("Received event for key $key : $msg") }
.map { key, msg -> enrich(msg) }
.foreach { key, enrichedMsg -> archive(enrichedMsg) }

return streamsBuilder.build()
}

我尝试使用窗口延迟来设置它,但没有成功。我当然可以在 peek 中进行 sleep ,但这会导致线程挂起,并且听起来不是一个非常干净的解决方案。

延迟如何运作的具体细节对于我的用例来说并不是非常重要。例如,所有这些都可以正常工作:

  1. 过去 x 秒内该主题的所有事件都会立即消耗。开始/完成消费后,流会等待 x 秒,然后再次消费
  2. 每个事件都会在发布到主题后 x 秒进行处理
  3. 流在每个事件之间延迟 x 秒消耗消息

如果有人能够提供几行 Kotlin 或 Java 代码来完成上述任何任务,我将不胜感激。

最佳答案

您无法真正暂停使用 Kafka Streams 从输入主题读取数据 - “延迟”的唯一方法是调用“ sleep ”,但正如您所提到的,这会阻塞整个线程并且不是一个好的解决方案。

但是,您可以使用有状态处理器,例如 process() (带有附加状态存储)而不是 foreach()。如果重试失败,您不会将记录放回到输入主题中,而是将其放入存储中,并注册一个具有所需重试延迟的标点符号。如果标点符号触发,您将重试,如果重试成功,您将从存储中删除该条目并取消标点符号;否则,您将等待标点符号再次触发。

关于java - 延迟 Kafka Streams 消费,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59249523/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com