gpt4 book ai didi

java - 卡夫卡流: Flushing intermediate Windowed results as commit interval and window time are not in sync

转载 作者:行者123 更新时间:2023-12-02 09:05:41 25 4
gpt4 key购买 nike

kafka 流的配置:

threads = 1;
replicationFactor = 1;
ktableCommitInterval= 10000;
ktableMemory=72000000;
timeDuration=10;

拓扑:

KStream<Windowed<String>,String> windowedStringKStream =
streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(),Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(),Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(timeDuration)).grace(Duration.ofSeconds(0)))
.reduce(Numners::append,Materialized.<String, String, WindowStore<Bytes,byte[]>>as(storeName).withCachingEnabled().withRetention(Duration.ofSeconds(timeDuration)).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
.toStream();


代码说明:

Code appends numbers in a 10 Second window. Incremental Number records are sent exactly at a interval of 1 second into input topic.

输出: Output topic results with timestamp from kafka tool. As we can see every alternate output is a intermediate window result

问题:

提交间隔设置为 10 秒。缓存大小设置为 72 MB。数据以字节为单位。状态存储已启用缓存。文档指出,kafka 流将数据推送到下游的操作语义取决于缓存大小或提交间隔(无论先发生什么)。但根据实验,提交在一分钟内发生两次。观察结果是,提交间隔在应用程序启动时开始,但窗口在数据开始到来时开始。如图所示,中间窗口结果被推送,最终窗口结果也被推送。

对于我正在处理的用例,不可能使用 Suppress(),因为如果该主题没有新数据,它不会刷新数据。

如有任何帮助,我们将不胜感激。如果有人面临这个问题或想要重现这个问题,请告诉我。

最佳答案

唯一的解决方案是构建一个自定义的 transform() 来实现自定义版本的“抑制”,即使没有新的输入数据到达(例如,挂钟),您也可以发出数据时间标点符号可能有助于实现它)。

目前(从 Apache Kafka 2.4 版本开始),没有内置支持。如果不使用 suppress(),窗口聚合可能总是在窗口实际关闭之前发出一些中间结果,并且无法以不同方式配置 Kafka Streams。

关于java - 卡夫卡流: Flushing intermediate Windowed results as commit interval and window time are not in sync,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59834936/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com