gpt4 book ai didi

google-cloud-dataflow - 如何为 Beam 管道中的 session 窗口编写单元测试?

转载 作者:行者123 更新时间:2023-12-02 00:59:02 26 4
gpt4 key购买 nike

我正在编写一个处理产品事件(创建、更新、删除)的管道。每个产品都属于具有特定持续时间的销售。我希望能够对给定销售中的所有产品执行一些聚合。出于本示例的目的,假设我只需要每次销售的唯一产品 ID 列表。

因此,我的管道在间隔持续时间很长的销售 ID 上使用 session 窗口(因此,当销售结束并且没有发布更多产品更新时,该销售窗口也会关闭)。我的问题是,如何为此编写单元测试?

为了这个测试,我们假设如下:

  • 事件只是带有销售 ID 和产品 ID 的字符串,以空格分隔,
  • applyDistinctProductsTransform基本上会执行我上面所说的。创建KV<String, String>关键是销售 ID 的元素;将 session 窗口设置为 600 秒的间隔时间;最后为每次销售创建所有产品 ID 的串联字符串。

这是我目前所拥有的:

我创建了一个 TestStream并添加一些元素:sale1 的 3 个产品.接下来,我将水印提高到 700,远远超过间隙持续时间。添加另一个产品,最后将水印推进到无穷大。

@Test
public void TestSessionWindow() {
Coder<String> utfCoder = StringUtf8Coder.of();
TestStream<String> onTimeProducts =
TestStream.create(utfCoder).addElements(
TimestampedValue.of("sale1 product1", new Instant(0)),
TimestampedValue.of("sale1 product2", new Instant(0)),
TimestampedValue.of("sale1 product3", new Instant(0))
)
.advanceWatermarkTo(new Instant(700)) // watermark passes trigger time
.addElements(
TimestampedValue.of("campaign1 product9", new Instant(710))
)
.advanceWatermarkToInfinity();

PCollection<KV<String, String>> results = applyDistinctProductsTransform(pipeline, onTimeProducts);

PAssert.that(results).containsInAnyOrder(
KV.of("sale1", "product1,product2,product3"),
KV.of("sale1", "product9")
);
pipeline.run().waitUntilFinish();
}

但是,

  1. 管道输出 sale1 的 KV , product1,product2,product3,product9所以product9附加到窗口。我本来希望这个产品在一个单独的窗口中处理,因此最终出现在输出 PCollection 的不同行中。
  2. 如何在 PAssert 中只获取单个窗口的结果?我知道有 inWindow函数,我找到了一个 fixed time window 的例子但我不知道如何为 session 窗口做同样的事情。

您可以查看 PTransform 的完整代码和 unit test .

最佳答案

1) 我相信你有一个简单的单位问题。以秒为单位指定窗口间隙持续时间 600 Duration.standardSecondsnew Instant(long)使用毫秒,这意味着 600 秒的间隔大于导致 session 合并的 700 毫秒的时间间隔。

2) session 在内部仍然使用间隔窗口。您将需要根据您的触发策略计算所有 session 合并后的输出窗口。默认情况下, session 窗口使用 IntervalWindow(timestamp, gap duration) , 并合并所有 overlapping windows创建一个更大的窗口。例如,如果您有相同 session key 的窗口(开始时间、结束时间)、[10、14]、[12、18]、[4、14],它们将全部合并生成一个 [4, 18] 窗口。

关于google-cloud-dataflow - 如何为 Beam 管道中的 session 窗口编写单元测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51994579/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com