gpt4 book ai didi

python - 只能使用分区数相同的 RDD 进行 zip 错误

转载 作者:行者123 更新时间:2023-11-30 23:07:41 25 4
gpt4 key购买 nike

我有一个 ipython 笔记本,其中包含 pyspark 代码,它在我的机器上运行良好,但是当我尝试在另一台机器上运行它时,它会在这一行(rdd3 行)抛出错误:

rdd2 = sc.parallelize(list1) 
rdd3 = rdd1.zip(rdd2).map(lambda ((x1,x2,x3,x4), y): (y,x2, x3, x4))
list = rdd3.collect()

我得到的错误是:

    ValueError                                Traceback (most recent call last)
<ipython-input-7-9daab52fc089> in <module>()

---> 16 rdd3 = rdd1.zip(rdd2).map(lambda ((x1,x2,x3,x4), y): (y,x2, x3, x4))


/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in zip(self, other)
1960
1961 if self.getNumPartitions() != other.getNumPartitions():
-> 1962 raise ValueError("Can only zip with RDD which has the same number of partitions")
1963
1964 # There will be an Exception in JVM if there are different number

我不知道为什么这个错误出现在一台机器上而在另一台机器上却没有? ValueError:只能使用具有相同分区数的 RDD 进行压缩

最佳答案

zip 一般来说是一个棘手的操作。它要求两个 RDD 不仅具有相同的分区数量,而且每个分区的元素数量也相同。

排除一些特殊情况,只有当两个 RDD 具有相同的祖先并且之间不存在可能改变元素数量(filterflatMap)的混洗和操作时,才能保证这一点共同的祖先和当前的状态。通常,它仅意味着map(1 对1)转换。

如果您知道顺序会保留,但分区数量或每个分区的元素数量不同,您可以使用带有索引的 join :

from operator import itemgetter

def custom_zip(rdd1, rdd2):
index = itemgetter(1)
def prepare(rdd, npart):
return (rdd.zipWithIndex()
.sortByKey(index, numPartitions=npart)
.keys())

npart = rdd1.getNumPartitions() + rdd2.getNumPartitions()

return prepare(rdd1, npart).zip(prepare(rdd2, npart))

rdd1 = sc.parallelize(["a_{}".format(x) for x in range(20)], 5)
rdd2 = sc.parallelize(["b_{}".format(x) for x in range(20)], 10)

rdd1.zip(rdd2).take(5)
## ValueError Traceback (most recent call last)
## ...
## ValueError: Can only zip with RDD which has the same number of partitions

custom_zip(rdd1, rdd2).take(5)
## [('a_0', 'b_0'), ('a_1', 'b_1'), ('a_2', 'b_2'),
## ('a_3', 'b_3'), ('a_4', 'b_4')]

Scala 的等价物是这样的:

def prepare[T: ClassTag](rdd: RDD[T], n: Int) = 
rdd.zipWithIndex.sortBy(_._2, true, n).keys

def customZip[T: ClassTag, U: ClassTag](rdd1: RDD[T], rdd2: RDD[U]) = {
val n = rdd1.partitions.size + rdd2.partitions.size
prepare(rdd1, n).zip(prepare(rdd2, n))
}

val rdd1 = sc.parallelize((0 until 20).map(i => s"a_$i"), 5)
val rdd2 = sc.parallelize((0 until 20).map(i => s"b_$i"), 10)

rdd1.zip(rdd2)

// java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
// at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRD
// ...

customZip(rdd1, rdd2).take(5)
// Array[(String, String)] =
// Array((a_0,b_0), (a_1,b_1), (a_2,b_2), (a_3,b_3), (a_4,b_4))

关于python - 只能使用分区数相同的 RDD 进行 zip 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32084368/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com