gpt4 book ai didi

kotlin - KafkaStreams : Getting Window Final Results

转载 作者:行者123 更新时间:2023-12-02 11:24:30 29 4
gpt4 key购买 nike

是否有可能获得window final result在 Kafka Streams 中通过抑制中间结果。

我无法实现这个目标。我的代码有什么问题?

    val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded())) // not working)
.toStream()
.print(Printed.toSysOut())

它导致此错误:
Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001: 
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String

代码/错误详情: https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380

最佳答案

问题是 Streams 在窗口期间自动包装显式 serde,但不自动包装默认 serde 的方式存在令人困惑的不对称性。恕我直言,这是一个应该纠正的疏忽,所以我提交了:https://issues.apache.org/jira/browse/KAFKA-7806

正如其他人所指出的,解决方案是在上游显式设置 key serde,而不依赖于默认 key serde。您可以:

使用 Materialized 在窗口聚合上设置 serdes

val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilWindowCloses(unbounded())))
.toStream()
.print(Printed.toSysOut())

(如妮舒推荐的那样)

(注意,没有必要命名 count 操作,它具有使其可查询的副作用)

或者在上游设置 serdes,例如在输入上:
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic, Consumed.with(Serdes.String(), Serdes.Double()))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded())))
.toStream()
.print(Printed.toSysOut())

(正如wardziniak推荐的那样)

这是你的选择;我认为在这种情况下,两种情况都没有太大不同。如果您正在执行与 count 不同的聚合,您可能会通过 Materialized 设置 serde 值无论如何,所以也许前者会是一种更统一的风格。

我还注意到您的窗口定义没有设置宽限期。窗口关闭时间定义为 window end + grace period ,默认值为 24 小时,因此在应用程序运行 24 小时的数据之前,您不会看到抑制发出的任何内容。

为了您的测试工作,我建议您尝试:
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)).grace(Duration.ZERO))

在生产中,您需要选择一个宽限期,以平衡您在流中期望的事件延迟量与您希望从抑制中看到的发射及时性量。

最后一点,我在您的要点中注意到您没有更改默认缓存或提交间隔。结果,您会注意到 count在将更新传递给抑制之前,运算符(operator)本身将在默认 30 秒内缓冲更新。这是一个很好的生产配置,因此您不会对本地磁盘或 Kafka 代理造成瓶颈。但是在您进行测试时,它可能会让您感到惊讶。

通常对于测试(或以交互方式尝试内容),我将禁用缓存并将提交间隔设置为短以实现最大的开发人员理智:
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);

对 serde 的疏忽感到抱歉。我希望我们能尽快解决 KAFKA-7806。

我希望这有帮助!

关于kotlin - KafkaStreams : Getting Window Final Results,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54110206/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com