gpt4 book ai didi

apache-spark - Effect preservesPartitioning RDD true/false 为 mapPartitions 提供相同的结果

转载 作者:行者123 更新时间:2023-12-04 10:49:12 26 4
gpt4 key购买 nike

此处的第二次尝试,如最初提出的错误示例。从文档:

preservesPartitioning indicates whether the input function preserves the partitioner, which should be false unless this is a pair RDD and the input function doesn't modify the keys.



不错的散文,但它的真正含义是什么?

这是一个人为的简单示例,无论我是否将 true 或 false 传递给 mapPartitions,即使我更改了 K,V 的 K,新 RDD 的每个分区的数据分区都保持不变。那么重点是什么?必须是我缺少的基本东西。
import org.apache.spark.HashPartitioner
// Some contrived function
def myfunc(iter: Iterator[(String, (Int, String))]) : Iterator[(String, (Int,String))] = {
iter.map{case(x,y) => ("B"+x+"A", y)}
}
val rdd1 = sc.parallelize(1 to 9).map(x => ("RFD"+x+"ABC", (1000, "xc888x"+x))).partitionBy(new HashPartitioner(459))
val rdd2 = rdd1.mapPartitions(myfunc,true) // or false
rdd2.collect

带有 mapPartitionsWithIndex 的 rdd2 的 true 和 false 输出在两种情况下都显示:
res21: Array[String] = Array((BRFD5ABCA,(1000,xc888x5)) -> 22, (BRFD4ABCA,(1000,xc888x4)) -> 66, (BRFD3ABCA,(1000,xc888x3)) -> 110, (BRFD2ABCA,(1000,xc888x2)) -> 154, (BRFD1ABCA,(1000,xc888x1)) -> 198, (BRFD9ABCA,(1000,xc888x9)) -> 305, (BRFD8ABCA,(1000,xc888x8)) -> 349, (BRFD7ABCA,(1000,xc888x7)) -> 393, (BRFD6ABCA,(1000,xc888x6)) -> 437)

这与 rdd1 的分区分布相同。

那么,对于preservesPartitioning 的真假有什么意义呢?

最佳答案

这对我来说也很不直观。我可以引用来自 Apache Spark User List 的声明这适合您的问题:

This is not what preservesPartitioning does -- actually what it means is that if the RDD has a Partitioner set (which means it's an RDD of key-value pairs and the keys are grouped into a known way, e.g. hashed or range-partitioned), your map function is not changing the partition of keys. This lets the job scheduler know that downstream operations, like joins or reduceByKey, can be optimized assuming that all the data for a given partition is located on the same machine. In both cases though, your function f operates on each partition.



在您的示例中,请考虑以下带有 preservePartitioing=false 的代码:
val rdd2 = rdd1.mapPartitions(myfunc,false) // or false
rdd2.groupByKey().map{case (key,values) => values.size}.toDebugString

给出:
(459) MapPartitionsRDD[5] at map at Spark.scala:44 []
| ShuffledRDD[4] at groupByKey at Spark.scala:44 []
+-(459) MapPartitionsRDD[3] at mapPartitions at Spark.scala:42 []
| ShuffledRDD[2] at partitionBy at Spark.scala:41 []
+-(4) MapPartitionsRDD[1] at map at Spark.scala:41 []
| ParallelCollectionRDD[0] at parallelize at Spark.scala:41 []

同时与 preservePartitioing=true :
(459) MapPartitionsRDD[5] at map at Spark.scala:44 []
| MapPartitionsRDD[4] at groupByKey at Spark.scala:44 []
| MapPartitionsRDD[3] at mapPartitions at Spark.scala:42 []
| ShuffledRDD[2] at partitionBy at Spark.scala:41 []
+-(4) MapPartitionsRDD[1] at map at Spark.scala:41 []
| ParallelCollectionRDD[0] at parallelize at Spark.scala:41 []

所以在第一种情况下, groupByKey导致额外的 shuffle,因为 spark 不知道键位于同一个分区(因为分区器丢失),在第二种情况下, groupByKey被翻译成一个简单的 mapPartitions因为spark知道第一个 mapPartitions没有改变分区,即键仍然在同一个分区中。

关于apache-spark - Effect preservesPartitioning RDD true/false 为 mapPartitions 提供相同的结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59569167/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com