gpt4 book ai didi

scala - 合并减少了整个阶段的并行度(spark)

转载 作者:行者123 更新时间:2023-12-03 04:16:05 25 4
gpt4 key购买 nike

有时 Spark 以低效的方式“优化”数据帧计划。考虑 Spark 2.1 中的以下示例(也可以在 Spark 1.6 中重现):

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})

val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))

df_result
.coalesce(1)
.saveAsTable(tablename)

在此示例中,我想在对数据帧进行昂贵的转换后写入 1 个文件(这只是演示该问题的示例)。 Spark 将 coalesce(1) 向上移动,以便 UDF 仅应用于包含 1 个分区的数据帧,从而破坏并行性(有趣的是,repartition(1) 不会以这种方式运行)。

概括而言,当我想要增加转换的某个部分的并行度,但此后减少并行度时,就会发生这种行为。

我找到了一种解决方法,其中包括缓存数据帧,然后触发数据帧的完整评估:

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})

val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
.cache

df_result.rdd.count // trigger computation

df_result
.coalesce(1)
.saveAsTable(tablename)

我的问题是:在这种情况下是否有另一种方法告诉 Spark 不要减少并行性?

最佳答案

其实并不是SparkSQL的优化,SparkSQL并没有改变Coalesce算子的位置,执行计划显示:

Coalesce 1
+- *Project [value#2, UDF(value#2) AS udfResult#11]
+- *SerializeFromObject [input[0, double, false] AS value#2]
+- Scan ExternalRDDScan[obj#1]

我引用coalesce API's description中的一段话:

注意:本段由 jira SPARK-19399 添加。所以在2.0的API中应该找不到它。

However, if you're doing a drastic coalesce, e.g. to numPartitions =1, this may result in your computation taking place on fewer nodesthan you like (e.g. one node in the case of numPartitions = 1). Toavoid this, you can call repartition. This will add a shuffle step,but means the current upstream partitions will be executed in parallel(per whatever the current partitioning is).

合并 API 不执行洗牌,但会导致先前 RDD 和当前 RDD 之间的狭窄依赖关系。由于RDD是惰性计算,计算实际上是通过合并分区完成的。

为了防止这种情况,您应该使用重新分区 API。

关于scala - 合并减少了整个阶段的并行度(spark),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44494656/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com