gpt4 book ai didi

python-2.7 - Dataproc Pyspark 作业仅在一个节点上运行

转载 作者:可可西里 更新时间:2023-11-01 15:11:22 28 4
gpt4 key购买 nike

我的问题是我的 pyspark 作业没有并行运行。

代码和数据格式:
我的 PySpark 看起来像这样(显然是经过简化的):

class TheThing:
def __init__(self, dInputData, lDataInstance):
# ...
def does_the_thing(self):
"""About 0.01 seconds calculation time per row"""
# ...
return lProcessedData

#contains input data pre-processed from other RDDs
#done like this because one RDD cannot work with others inside its transformation
#is about 20-40MB in size
#everything in here loads and processes from BigQuery in about 7 minutes
dInputData = {'dPreloadedData': dPreloadedData}

#rddData contains about 3M rows
#is about 200MB large in csv format
#rddCalculated is about the same size as rddData
rddCalculated = (
rddData
.map(
lambda l, dInputData=dInputData: TheThing(dInputData, l).does_the_thing()
)
)

llCalculated = rddCalculated.collect()
#save as csv, export to storage

在 Dataproc 集群上运行:
集群是通过 Dataproc UI 创建的.
作业是这样执行的:
gcloud --project <project> dataproc jobs submit pyspark --cluster <cluster_name> <script.py>

我通过 UI 观察了工作状态,started like this .浏览它时,我注意到我的工作节点中只有一个(看似随机的)在做任何事情。所有其他人都完全闲置。

PySpark 的重点是并行运行这个东西,显然不是这样。我已经在各种集群配置中运行了这些数据,最后一个是巨大的,这是我注意到它是单节点使用的时候。因此,为什么我的工作需要很长时间才能完成,而且时间似乎与集群大小无关。

所有使用较小数据集的测试在我的本地机器和集群上都毫无问题地通过了。我真的只需要高档。

编辑
我变了
llCalculated = rddCalculated.collect()
#... save to csv and export

rddCalculated.saveAsTextFile("gs://storage-bucket/results")

并且只有一个节点仍在执行工作。

最佳答案

根据您是从 GCS 还是 HDFS 加载 rddData,默认分割大小可能是 64MB 或 128MB,这意味着您的 200MB 数据集只有 2-4 个分区。 Spark 之所以这样做,是因为典型的基本数据并行任务处理数据的速度足够快,64MB-128MB 意味着可能需要数十秒的处理时间,因此拆分成较小的并行 block 没有任何好处,因为启动开销将占主导地位。

在您的情况下,听起来每 MB 的处理时间要长得多,因为您加入了另一个数据集,并且可能对每条记录执行了相当重量级的计算。所以你需要更多的分区,否则无论你有多少节点,Spark 都不知道要拆分成超过 2-4 个工作单元(如果每台机器都可能打包到一台机器上)有多个核心)。

所以你只需要调用repartition:

rddCalculated = (
rddData
.repartition(200)
.map(
lambda l, dInputData=dInputData: TheThing(dInputData, l).does_the_thing()
)
)

或者将重新分区添加到较早的行:

rddData = rddData.repartition(200)

或者如果你在读取时重新分区,你可能会有更好的效率:

rddData = sc.textFile("gs://storage-bucket/your-input-data", minPartitions=200)

关于python-2.7 - Dataproc Pyspark 作业仅在一个节点上运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37589472/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com