gpt4 book ai didi

apache-kafka-streams - 仅在流上收到新事件时才抑制触发事件

转载 作者:行者123 更新时间:2023-12-04 08:22:29 28 4
gpt4 key购买 nike

我正在使用 Kafka 流 2.2.1。

我正在使用抑制来阻止事件直到窗口关闭。我正在使用事件时间语义。
但是,只有在流上有新消息可用时才会触发触发消息。

提取以下代码以对问题进行采样:

        KStream<UUID, String>[] branches = is
.branch((key, msg) -> "a".equalsIgnoreCase(msg.split(",")[1]),
(key, msg) -> "b".equalsIgnoreCase(msg.split(",")[1]),
(key, value) -> true);

KStream<UUID, String> sideA = branches[0];
KStream<UUID, String> sideB = branches[1];

KStream<Windowed<UUID>, String> sideASuppressed =
sideA.groupByKey(
Grouped.with(new MyUUIDSerde(),
Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(31)).grace(Duration.ofMinutes(32)))
.reduce((v1, v2) -> {
return v1;
})
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream();

当新消息到达“sideA”流时,消息仅从“sideASuppressed”流式传输(到达“sideB”的消息不会导致抑制器发出任何消息,即使窗口关闭时间已经过去很久了)。
尽管在生产中,由于容量大,问题可能不会发生太多,但在很多情况下,必须不要等待进入“sideA”流的新消息。

提前致谢。

最佳答案

根据 Kafka 流文档:

Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available. If at least one partition does not have any new data available, stream-time will not be advanced and thus punctuate() will not be triggered if PunctuationType.STREAM_TIME was specified. This behavior is independent of the configured timestamp extractor, i.e., using WallclockTimestampExtractor does not enable wall-clock triggering of punctuate().



我不确定为什么会这样,但是,它解释了为什么只有在它使用的队列中有可用消息时才会发出被抑制的消息。

如果有人对为什么实现如此有答案,我将很乐意学习。这种行为导致我的实现发出消息只是为了让我的被抑制的消息及时发出,并导致代码的可读性大大降低。

关于apache-kafka-streams - 仅在流上收到新事件时才抑制触发事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58571651/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com