gpt4 book ai didi

apache-spark - pyspark折叠方法输出

转载 作者:行者123 更新时间:2023-12-03 23:51:23 25 4
gpt4 key购买 nike

我对 fold 的输出感到惊讶,我无法想象它在做什么。

我希望 something.fold(0, lambda a,b: a+1) 会返回 something 中的元素数,因为折叠开始于0 并为每个元素添加 1

sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+1 )
8

我来自 Scala,其中 fold 就像我所描述的那样工作。那么 fold 应该如何在 pyspark 中工作?谢谢你的想法。

最佳答案

要了解这里发生了什么,让我们看一下 Spark 的 fold 操作的定义。由于您使用的是 PySpark,我将展示代码的 Python 版本,但 Scala 版本表现出完全相同的行为(您也可以 browse the source on GitHub ):

def fold(self, zeroValue, op):
"""
Aggregate the elements of each partition, and then the results for all
the partitions, using a given associative function and a neutral "zero
value."
The function C{op(t1, t2)} is allowed to modify C{t1} and return it
as its result value to avoid object allocation; however, it should not
modify C{t2}.
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
15
"""
def func(iterator):
acc = zeroValue
for obj in iterator:
acc = op(obj, acc)
yield acc
vals = self.mapPartitions(func).collect()
return reduce(op, vals, zeroValue)

(对比见 Scala implementation of RDD.fold)。

Spark 的 fold 操作是先折叠每个分区,然后再折叠结果。问题是空分区被折叠到零元素,因此最终的驱动程序端折叠最终会为 每个 分区折叠一个值,而不是为每个 非空 分区。这意味着 fold 的结果对分区数很敏感:

>>> sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 )
100
>>> sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 )
50
>>> sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 )
1

在最后一种情况下,发生的情况是单个分区被折叠到正确的值,然后该值在驱动程序处与零值折叠以产生 1。

看来Spark的fold()操作实际上要求fold函数除了关联之外还具有可交换性。实际上,Spark 中还有其他地方会强制执行此要求,例如,混洗分区中元素的顺序在运行中可能是不确定的(参见 SPARK-5750)。

我已打开 Spark JIRA 票证来调查此问题:https://issues.apache.org/jira/browse/SPARK-6416 .

关于apache-spark - pyspark折叠方法输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29150202/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com