gpt4 book ai didi

google-cloud-dataflow - 具体版本 : PubSub/Dataflow acknowledgement of unbounded data

转载 作者:行者123 更新时间:2023-12-02 01:00:04 24 4
gpt4 key购买 nike

我有一个项目,它有一个 apache beam 管道,其依赖项的设置方式使得我必须使用 PubSub 的 0.20.0-beta 版。此管道一直运行(无限制)。

[+] 问题:PubSub 消息每 30 分钟左右重复一次。

[+] 我尝试过的:我读过许多解决方案,其中提到数据流运行器如何具有确认发生的检查点。我还读过,使用 GroupByKey 等 PTransform 可以更快地确认这些消息。所以我尝试了窗口化、按键触发和分组,但我仍然从 PubSub 收到重复的消息。

[+] 问题:我究竟做错了什么?为什么消息没有被确认? (如果我理解正确,它不会在管道结束执行之前被确认??但我的管道需要很长时间,如何提前确认?)

这是特定于 0.20.0-beta 的“版本”错误,还是我应该能够将 PubsubIO.Reader 与窗口和触发一起使用以便更早确认?

[+]代码:

窗口时间为 10 秒,PubSub ack 截止时间为 60 秒。

     .apply("Listen_To_PubSub", PubsubIO.readStrings().fromSubscription(subscription))
.apply("Windowing", Window.<String> into(window).triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(timeLimit)).withAllowedLateness(Duration.ZERO).discardingFiredPanes())
.apply("DeleteFromBQ", ParDo.of(new DeleteFromBQ()))
.apply("Mapping", ParDo.of(new Mapping()))
.apply("GroupByKey", GroupByKey.<String,String>create())
.apply("Acknowledge", ParDo.of(new Grouped()))
.apply("DoSomething1", ParDo.of(new DoSomething1()))
.apply("Flatten_Iterable", Flatten.iterables())
.apply("DoSomething2", ParDo.of(new DoSomething2()))
.apply("DoSomething3", ParDo.of(new DoSomething3()))
.apply("DoSomething4", ParDo.of(new DoSomething4()))
.apply("Write_To_BigQuery", BigQueryIO.writeTableRows()
.to(output)
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
);

提前致谢!任何输入表示赞赏。

最佳答案

当您应用如此多的转换时,您似乎超过了 60 秒的确认期限。要查看需要多长时间,我建议使用 Logging Pipeline Messages .我认为您可能需要尽快移动确认。

您可以做的另一件事是使用更高的机器类型来更快地处理消息。

关于google-cloud-dataflow - 具体版本 : PubSub/Dataflow acknowledgement of unbounded data,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51271594/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com