gpt4 book ai didi

apache-spark - Apache Spark : What is the equivalent implementation of RDD. groupByKey() 使用 RDD.aggregateByKey()?

转载 作者:行者123 更新时间:2023-12-04 01:41:47 25 4
gpt4 key购买 nike

Apache Spark pyspark.RDD API 文档提到 groupByKey()效率低下。相反,建议使用 reduceByKey() , aggregateByKey() , combineByKey() , 或 foldByKey()反而。这将导致在 shuffle 之前在工作线程中进行一些聚合,从而减少跨工作线程的数据混洗。

给定以下数据集和 groupByKey()表达式,什么是不使用 groupByKey() 的等效且有效的实现(减少跨 worker 数据混洗) ,但提供相同的结果?

dataset = [("a", 7), ("b", 3), ("a", 8)]
rdd = (sc.parallelize(dataset)
.groupByKey())
print sorted(rdd.mapValues(list).collect())

输出:
[('a', [7, 8]), ('b', [3])]

最佳答案

据我所知,在这种特殊情况下,使用 aggregateByKey 没有任何好处*或类似的功能。由于您正在构建一个列表,因此没有“真正的”减少,必须改组的数据量或多或少相同。

要真正观察到一些性能提升,您需要进行实际减少传输数据量的转换,例如计数、计算汇总统计数据、查找唯一元素。

关于使用 reduceByKey() 的不同好处, combineByKey() , 或 foldByKey()当您考虑 Scala API 签名时,有一个重要的概念差异。

两者 reduceByKeyfoldByKey map 来自 RDD[(K, V)]RDD[(K, V)]而第二个提供额外的零元素。

reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)] 
foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]
combineByKey (没有 aggregateByKey ,但它是相同类型的转换)从 RDD[(K, V)] 转换而来至 RDD[(K, C)] :

combineByKey[C](
createCombiner: (V) ⇒ C,
mergeValue: (C, V) ⇒ C,
mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

仅回到您的示例 combineByKey (在 PySpark aggregateByKey 中)确实适用,因为您是从 RDD[(String, Int)] 转换而来的至 RDD[(String, List[Int])] .

而在像 Python 这样的动态语言中,实际上可以使用 foldByKey 来执行这样的操作。或 reduceByKey它使代码的语义变得不清楚,并引用@tim-peters “应该有一种——最好只有一种——明显的方法来做到这一点”[1]。
aggregateByKey之间的区别和 combineByKeyreduceByKey 之间几乎相同和 foldByKey所以对于一个列表来说,它主要是一个品味问题:

def merge_value(acc, x):
acc.append(x)
return acc

def merge_combiners(acc1, acc2):
acc1.extend(acc2)
return acc1

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
.combineByKey(
lambda x: [x],
lambda u, v: u + [v],
lambda u1,u2: u1+u2))

在实践中你应该更喜欢 groupByKey尽管。与上面提供的简单实现相比,PySpark 实现明显更加优化。

1.Peters, T. PEP 20 -- Python 之禅。 (2004)。在 https://www.python.org/dev/peps/pep-0020/

* 在实践中,这里实际上有很多松散的地方,尤其是在使用 PySpark 时。 groupByKey的Python实现比朴素的按键组合明显更优化。您可以查看 Be Smart About groupByKey ,由我和 @eliasah 创建进行额外的讨论。

关于apache-spark - Apache Spark : What is the equivalent implementation of RDD. groupByKey() 使用 RDD.aggregateByKey()?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31081563/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com