gpt4 book ai didi

python - 如何在 Apache Beam/Google 数据流中将大窗口缩小为较小的窗口?

转载 作者:行者123 更新时间:2023-12-01 07:24:17 26 4
gpt4 key购买 nike

我遇到一个问题,在处理大型数据集时,我的管道不断失败。除了有关重新洗牌期间发生的处理暂停的警告之外,工作日志不显示任何错误。我怀疑我通过超出分配的内存来杀死工作人员,因为正在发生的事情的唯一提示是我可以看到工作人员在日志中旋转,但然后继续什么也不做。最终,如果我等待足够长的时间或终止管道,它们就会失败。

我想将我的元素减少到几个并发运行的组,以便插入到elasticsearch中。例如,从 40 个工作人员进行处理变为仅 7 个工作人员向 ES 进行批量插入。

我在处理和elasticsearch插入之间放置了窗口。我有日志记录语句,我可以看到,尽管我使用 AfterCount 进行窗口化,但窗口大小似乎在很大程度上不受限制。 IE。我将窗口大小设置为 1000,得到一组 12k。我认为问题在于 Apache Beam 在 bundle 上运行,并且仅在处理 bundle 后才触发,并且我的转换之一可能会向集合生成任意数量的输出元素。

完成这项任务的预期方法是什么?

想象我想要发生的事情:

1000 个项目/50 组窗口 -> 每个窗口输出 500,000 多个文档 -> 插入 7 个工作人员,每批 2k 个文档

当前的管道流程(我已经尝试了很多变体):

Read from datastore
| window by timestamp (5 seconds) with early trigger of 100 elements
| group by random int between 0 and 50 (need groups for batch fetch call in processing)
| fetch from service in batch and process with output documents
| window by timestamp (10 seconds) with early trigger of 1000 documents
| group by random int between 0 and X
| insert into ES

我尝试了各种 X 值。较低的值会导致较高的插入吞吐量,但在处理大量数据时,管道会在插入步骤失败。现在尝试运行 X=80,我发现吞吐量适中,但出现了一些超时,并且通常需要几秒或更短时间的批处理调用现在需要 15 秒或更长时间才能完成。

向插入添加更多工作人员似乎可以解决工作人员根本无法执行任何插入的问题,但是大量的批处理请求效率非常低,而且它们最终需要更长的时间才能完成,并且存在超时和集群过载的风险.

为了更好地表达,此时我只是在尝试不同的参数,并且肯定有一种方法来设计管道,因此无论数据或窗口大小如何,这都不是问题。

最佳答案

我不确定工作人员默默失败的根本原因是什么,但是当您使用 AfterCount 触发器时,您的窗口没有被绑定(bind)的原因是因为触发器仅在流中工作管道。由于您的管道是批处理管道,因此计数将被忽略。

解决方案是避免使用窗口、触发器和分组将元素一起批处理,而是将其替换为 BatchElements转换,这似乎正是您所需要的。这样,您的管道将如下所示:

Read from datastore
| batch into X elements
| fetch from service in batch and process with output documents
| batch into Y documents
| insert into ES

关于python - 如何在 Apache Beam/Google 数据流中将大窗口缩小为较小的窗口?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57532736/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com