gpt4 book ai didi

python - 为什么 Apache Beam python 中的 GroupByKey 之后的 FlatMap 这么慢?

转载 作者:太空宇宙 更新时间:2023-11-04 02:43:02 26 4
gpt4 key购买 nike

鉴于键/值对的数据源相对较小 (3,000-10,000),我尝试仅处理满足组阈值 (50-100) 的记录。因此,最简单的方法是按键对它们进行分组、过滤和展开——使用 FlatMap 或 ParDo。迄今为止,最大的一组只有 1,500 条记录。但这似乎是 Google Cloud Dataflow 生产中的一个严重瓶颈。

根据给定的列表

(1, 1)(1, 2)(1, 3)...(2, 1)(2, 2)(2, 3)...

运行一组转换以按键过滤和分组:

p | 'Group' >> beam.GroupByKey()
| 'Filter' >> beam.Filter(lambda (key, values): len(list(values)) > 50)
| 'Unwind' >> beam.FlatMap(lambda (key, values): values)

关于如何提高性能的任何想法?感谢您的帮助!

最佳答案

这是管道的一个有趣的极端情况。我相信您的问题在于您读取来自 GroupByKey 的数据的方式。让我简要介绍一下 GBK 的工作原理。

什么是GroupByKey,大数据系统是如何实现的

所有大数据系统都实现了对同一键的多个元素进行操作的方法。这在 MapReduce 中称为 reduce,在其他大数据系统中称为 Group By Key 或 Combine。

当您执行 GroupByKey 转换时,Dataflow 需要将单个键的所有元素收集到同一台机器中。由于同一 key 的不同元素可能在不同的机器上处理,因此需要以某种方式对数据进行序列化。

这意味着当您读取来自 GroupByKey 的数据时,您正在访问 worker 的 IO(即不是从内存),因此您真的想避免读取随机数据太多次.

这如何转化为您的管道

我认为您的问题在于 FilterUnwind 都将分别从 shuffle 中读取数据(因此您将为每个列表读取数据两次)。你想要做的是只读取一次你的洗牌数据。您可以使用单个 FlatMap 来完成此操作,它既可以过滤又可以展开您的数据,而无需从 shuffle 中重复读取。像这样:

def unwind_and_filter((key, values)):
# This consumes all the data from shuffle
value_list = list(values)
if len(value_list) > 50:
yield value_list

p | 'Group' >> beam.GroupByKey()
| 'UnwindAndFilter' >> beam.FlatMap(unwind_and_filter)

如果这有帮助,请告诉我。

关于python - 为什么 Apache Beam python 中的 GroupByKey 之后的 FlatMap 这么慢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45889618/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com