gpt4 book ai didi

apache-kafka-streams - Kafka Stream - 如果在一段时间内没有收到给定键的事件,如何发送警报

转载 作者:行者123 更新时间:2023-12-04 08:21:58 26 4
gpt4 key购买 nike

如果在一段时间内没有在给定键的主题中收到任何事件,我需要发送警报。用 KafkaStream 解决这个用例的最佳方法是什么?

我试过:

1) 一个窗口由 连同 压制 运算符(operator):

    stream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1000)).grace(Duration.ZERO))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()))
.filter((k, v) -> v == 0)
.toStream()
.map((windowId, count) -> KeyValue.pair(windowId.key(), AlarmEvent.builder().build()))
.to(ALARMS, Produced.with(Serdes.String(), AlarmEvent.serde()));

但似乎在到期后发生事件之前窗口不会关闭,因此无法在定义的超时后准确发送警报。

2) 使用 处理器 带有 的 API标点符号 ,它似乎有效,但我只测试了 拓扑测试驱动程序 AdvanceWallClockTime() .不确定这个 advanceWallClockTime() 是否反射(reflect)了实时提前,或者只会在事件接收时改变,从而回到 1) 中的问题。

3) 如果标点符号有效,我想在 中使用它值(value)转换器 受益于 DSL 拓扑。但是,我遇到了 How to forward event downstream from a Punctuator instance in a ValueTransformer? 中描述的问题。 .无法从标点符号实例向下游发送事件。

4)最后,我想到了为每个分区定期(例如每秒)注入(inject)一些虚拟事件,以人为地强制内部时钟前进。这将允许我使用干净简单的 DSL 窗口并抑制运算符。

最佳答案

2) Using processor API with a punctator, it seems to work but I only tested with a TopologyTestDriver and advanceWallClockTime(). Not sure this advanceWallClockTime() relflects real time advance, or would only change upon event reception, thus falling back to the problem in 1).



这是正确的做法。顾名思义,可以根据挂钟时间(即系统时间)触发标点符号。 TopologyTestDriver模拟挂钟时间以进行测试,但 KafkaStreams将使用系统时间。

3) If punctuator works, I would like to use it in a ValueTranformer to benefit from the DSL topology. However, I am encountering the problem described in How to forward event downstream from a Punctuator instance in a ValueTransformer?. Cannot send event downstream from the punctuator instance.



您需要使用 transform()反而。通过 forward() 发送数据 ValueTransformer 的标点符号中不允许使用因为您可以发出任何 key ,违反未修改 key 的契约(Contract)。

4) Finally, I had the idea to inject some dummy events on a regular basis (eg. every second) for every partitions so as to artificially force the inner clock to advance. This would allows me to use the clean and simple DSL window and suppress operators.



这也应该有效。

关于apache-kafka-streams - Kafka Stream - 如果在一段时间内没有收到给定键的事件,如何发送警报,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56377897/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com