gpt4 book ai didi

google-cloud-dataflow - 当 CoGroupByKey 与 CalendarWindows 时,Flatten 的输入具有不兼容的窗口 windowFns

转载 作者:行者123 更新时间:2023-12-04 21:35:22 28 4
gpt4 key购买 nike

TL;博士:
如何使用与 CalendarWindows 设置的窗口策略相同的 CoGroupByKey 一组 PCollections?
长版
我正在编写一个从两个不同的发布/订阅中读取的数据流管道,其中一个 PCollection 被拆分为一个 PCollectionTuple,最后我尝试 join them in a CoGroupByKey在将其保存在 BigQuery 之前。
在测试管道期间,我的 PCollections 窗口策略是:

private static PCollection<KV<String, Long>> applyWindowsAndCount(final PCollection<KV<String, Long>> summary, final String OperationName){
return summary
.apply("Apply Windows " + OperationName, Window
.<KV<String, Long>>into(FixedWindows.of(Duration.standardMinutes(1)))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(1)))
.apply("Count " + OperationName, Count.perKey());
}
我用 设置它们1 分钟的固定窗口 长度,以便快速获得结果。
我的分组是这样的:
private static PCollection<KV<String, CoGbkResult>> MergeSummary(PCollection<KV<String, Long>> Avail, PCollection<KV<String, Long>> ValuationOK, PCollection<KV<String, Long>> ValuationKO){
return KeyedPCollectionTuple.of(Util.AVAIL, Avail)
.and(Util.VALUATION_OK, ValuationOK)
.and(Util.VALUATION_KO, ValuationKO)
.apply("Merge Summary", CoGroupByKey.create());
}
然而,当我在本地和云上进行测试时,它运行流畅,因为我使用实际生产值设置了窗口, 1 天的日历窗口 长度如下:
private static PCollection<KV<String, Long>> applyWindowsAndCount(final PCollection<KV<String, Long>> summary, final String OperationName){
return summary
.apply("Apply Windows " + OperationName, Window
.<KV<String, Long>>into(CalendarWindows.days(1).withTimeZone(DateTimeZone.UTC).withStartingDay(2016,9,20)) //Per day windowing.
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(1))) //Accepts X days late data.
.apply("Count " + OperationName, Count.perKey());
}
然后我什至无法编译代码,因为我收到如下消息:
Exception in thread "main" java.lang.IllegalStateException: Inputs to Flatten had incompatible window windowFns: com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows$DaysWindows@6af9fcb2, com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows$DaysWindows@6cce16f4
at com.google.cloud.dataflow.sdk.transforms.Flatten$FlattenPCollectionList.apply(Flatten.java:121)
at com.google.cloud.dataflow.sdk.transforms.Flatten$FlattenPCollectionList.apply(Flatten.java:105)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:413)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
at com.google.cloud.dataflow.sdk.values.PCollectionList.apply(PCollectionList.java:175)
at com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey.apply(CoGroupByKey.java:124)
at com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey.apply(CoGroupByKey.java:74)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:413)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:290)
at com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:116)
阅读文档我发现了这一点:

When using CoGroupByKey to group PCollections that have a windowing strategy applied, all of the PCollections you want to group must use the same windowing strategy and window sizing. For example, all the collections you're merging must use (hypothetically) identical 5-minute fixed windows or 4-minute sliding windows starting every 30 seconds.

If your pipeline attempts to use CoGroupByKey to merge PCollections with incompatible windows, Dataflow will generate an IllegalStateException error when your pipeline is constructed.


是不是dataflow 认为我的PCollections 有不兼容的窗口,但是,所有这些窗口都是使用我之前复制的函数应用的。那么,我如何才能 CoGroupByKey 一组具有与 CalendarWindows 设置相同的窗口策略的 PCollections?

最佳答案

看起来这是 CalendarWindows 中的一个错误;要解决这个问题,您可以创建一个 CalendarWindows 对象,并将其用作每个 PCollection 的 WindowFn,而不是为每个 PCollection 创建单独的 CalendarWindows 对象。

关于google-cloud-dataflow - 当 CoGroupByKey 与 CalendarWindows 时,Flatten 的输入具有不兼容的窗口 windowFns,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39617897/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com