gpt4 book ai didi

java - 卡夫卡流 : Should we advance stream time per key to test Windowed suppression?

转载 作者:行者123 更新时间:2023-12-05 05:01:13 24 4
gpt4 key购买 nike

我从This blog中学到了和 this tutorial为了用事件时间语义测试抑制,应该发送虚拟记录来提前流时间。我试图通过这样做来提前时间。但这似乎不起作用,除非特定 key 的时间提前。

我有一个自定义的 TimestampExtractor,它将我首选的“流时间”与记录相关联。我的流拓扑伪代码如下(我使用的是 Kafka Streams DSL API):

    source.mapValues(someProcessingLambda)
.flatMap(flattenRecordsLambda)
.groupByKey(Grouped.with(Serdes.ByteArray(), Serdes.ByteArray()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(Duration.ZERO))
.aggregate(()->null, aggregationLambda)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));

我的输入格式如下:

   1 - {"stream_time":"2019-04-09T11:08:36.000-04:00", id:"1", data:"..."}
2 - {"stream_time":"2019-04-09T11:09:36.000-04:00", id:"1", data:"..."}
3 - {"stream_time":"2019-04-09T11:18:36.000-04:00", id:"2", data:"..."}
4 - {"stream_time":"2019-04-09T11:19:36.000-04:00", id:"2", data:"..."}
.
.

现在记录 12 根据 stream_time3 属于一个 10 分钟的窗口>4 属于另一个。在该窗口内,记录根据 id 聚合。我预计记录 3 会发出流已经前进的信号并导致抑制发出与第一个窗口对应的数据。但是,在我发送带有 id:1 的虚拟记录以提前该 key 的流时间之前,不会发出数据。

我对测试说明的理解有误吗?这是预期的行为吗?虚拟记录的 key 重要吗?

最佳答案

很抱歉给您带来麻烦。这确实是一个棘手的问题。我有一些想法可以添加一些操作来支持这种集成测试,但是在不破坏基本的流处理时间语义的情况下很难做到。

听起来您正在测试“真正的”KafkaStreams 应用程序,而不是使用 TopologyTestDriver 进行测试。我的第一个建议是,如果 TopologyTestDriver 满足您的需求,您将有更好的时间来验证您的应用程序语义。

在我看来,您的输入主题(以及您的应用程序)中可能有多个分区。如果 key 1 进入一个分区,而 key 3 进入另一个分区,您将看到您所观察到的情况。应用程序的每个分区独立地跟踪流时间。TopologyTestDriver 工作得很好,因为它只使用一个分区,还因为它同步处理数据。否则,您将不得不制作“虚拟”时间推进消息,以将其发送到与您要清除的 key 相同的分区。

这将特别棘手,因为您的“flatMap().groupByKey()”将重新分区数据。您必须制作虚拟消息,以便它在重新分区后进入正确的分区。或者您可以尝试将您的虚拟消息直接写入重新分区主题。

如果您确实需要使用 KafkaStreams 而不是 TopologyTestDriver 进行测试,我想最简单的方法就是为每个键编写一条“时间推进”消息,正如您在问题中所建议的那样。不是因为它是绝对必要的,而是因为它是满足所有这些警告的最简单方法。我还要提到的是,我们正在对 Kafka Streams 中的流时间处理进行一些一般性改进,这应该会显着简化情况,但当然,这目前对您没有帮助。

关于java - 卡夫卡流 : Should we advance stream time per key to test Windowed suppression?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62805247/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com