gpt4 book ai didi

java - 如何仅在窗口完成时将窗口聚合结果发送到输出主题?

转载 作者:行者123 更新时间:2023-12-01 19:30:24 27 4
gpt4 key购买 nike

我在进行窗口聚合时遇到问题。我想对每个键的值进行求和,并且仅在窗口完成时将结果发送到输出主题。问题是“输入”主题中的每个事件都会产生一个“输出”主题的事件。我想仅在窗口完成时将事件发布到输出主题。例如,如果窗口为一分钟,则每分钟向每个键发送一个事件。示例代码如下:

.groupByKey()  
.windowedBy(TimeWindows.of(Duration.ofMinutes(2))
.reduce((v1, v2) -> String.valueOf(Integer.parseInt(v1) + Integer.parseInt(v2)))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.toStream((k,v) -> k.key())
.to("output_topic");

但我遇到以下异常:

Exception in thread "learningtime_application-665cd31a-1957-448b-8cf7-779ab359cfd2-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-REDUCE-STATE-STORE-0000000003 Caused by: java.lang.ClassCastException: class org.apache.kafka.streams.kstream.Windowed cannot be cast to class java.lang.String (org.apache.kafka.streams.kstream.Windowed is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')

最佳答案

您遇到了一个已知错误:https://issues.apache.org/jira/browse/KAFKA-9259

suppress 运算符无法从配置中正确获取默认的 serdes,即它使用 key serde 而不将其转换为 windowed-key-serdes。

作为解决方法,您需要通过 Materialized.with(...)reduce() 中显式指定 serdes。您传入纯键和值 Serdes,reduce 会将 key-serde 转换为 windowed-key-serde,然后也会传递到 suppress()

关于java - 如何仅在窗口完成时将窗口聚合结果发送到输出主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59264266/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com