gpt4 book ai didi

apache-kafka - 如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果?

转载 作者:行者123 更新时间:2023-12-03 08:58:18 25 4
gpt4 key购买 nike

我想做的是:

  • 使用数字主题(Long's)中的记录
  • 聚合(计数)每 5 秒窗口的值
  • 将 FINAL 聚合结果发送到另一个主题

  • 我的代码如下所示:
    KStream<String, Long> longs = builder.stream(
    Serdes.String(), Serdes.Long(), "longs");

    // In one ktable, count by key, on a five second tumbling window.
    KTable<Windowed<String>, Long> longCounts =
    longs.countByKey(TimeWindows.of("longCounts", 5000L));

    // Finally, sink to the long-avgs topic.
    longCounts.toStream((wk, v) -> wk.key())
    .to("long-counts");
    看起来一切都按预期进行,但聚合被发送到每个传入记录的目标主题。我的问题是如何只发送每个窗口的最终聚合结果?

    最佳答案

    在 Kafka Streams 中没有“最终聚合”这样的东西。窗口始终保持打开状态,以处理在窗口结束时间过后到达的无序记录。然而, window 不会永远保留。一旦保留时间到期,它们就会被丢弃。没有关于何时丢弃窗口的特殊操作。

    有关更多详细信息,请参阅 Confluent 文档:http://docs.confluent.io/current/streams/

    因此,对于聚合的每次更新,都会生成一条结果记录(因为 Kafka Streams 也会更新乱序记录的聚合结果)。您的“最终结果”将是最新的结果记录(在窗口被丢弃之前)。根据您的用例,手动重复数据删除将是解决问题的一种方法(使用较低级别的 API, transform()process() )

    这篇博文也可能有所帮助:https://timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not-looking-at-facebook.html

    另一篇不使用标点符号解决此问题的博客文章:http://blog.inovatrend.com/2018/03/making-of-message-gateway-with-kafka.html

    更新

    KIP-328 , KTable#suppress()添加了运算符,这将允许以严格的方式抑制连续更新并为每个窗口发出单个结果记录;权衡是增加延迟。

    关于apache-kafka - 如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38935904/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com