gpt4 book ai didi

apache-spark - Spark reduceByKey 以及如何最小化混洗

转载 作者:行者123 更新时间:2023-12-02 03:16:53 24 4
gpt4 key购买 nike

我正在处理这样的表格:

ID    f1
001 1
001 2
001 3
002 0
002 7

我想计算相同ID的f1列的总和,并用总和创建一个新列,即:

ID    f1   sum_f1
001 1 6
001 2 6
001 3 6
002 0 7
002 7 7

我的解决方案是使用reduceByKey 计算总和,然后将结果与原始表连接:

val table = sc.parallelize(Seq(("001",1),("001",2),("001",3),("002",0),("002",7)))
val sum = table.reduceByKey(_ + _)
val result = table.leftOuterJoin(sum).map{ case (a,(b,c)) => (a, b, c.getOrElse(-1) )}

我得到了正确的结果:

result.collect.foreach(println)

输出:

(002,0,7)
(002,7,7)
(001,1,6)
(001,2,6)
(001,3,6)

问题是代码中有 2 个 shuffle 阶段,一个在 reduceByKey,另一个在 leftOuterJoin,但是如果我在 Hadoop MapReduce 中编写代码,很容易只用 1 个 shuffle 阶段(超过一旦在 reduce 阶段使用了 outputer.collect 函数)。所以我想知道是否有更好的方法来一次洗牌。任何建议将不胜感激。

最佳答案

另一种方法是使用aggregateByKey。这可能很难理解方法但来自 spark 文档:

(groupByKey) Note: This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using PairRDDFunctions.aggregateByKey or PairRDDFunctions.reduceByKey will provide much better performance.

此外,aggregateByKey 是一个通用函数,因此值得了解。

当然我们这里不是做“sum之类的简单聚合”所以这种方法与 groupByKey 相比的性能优势可能不存在。显然,在真实数据上对这两种方法进行基准测试是个好主意。

具体实现如下:

// The input as given by OP here: http://stackoverflow.com/questions/36455419/spark-reducebykey-and-keep-other-columns
val table = sc.parallelize(Seq(("001", 1), ("001", 2), ("001", 3), ("002", 0), ("002", 7)))

// zero is initial value into which we will aggregate things.
// The second element is the sum.
// The first element is the list of values which contributed to this sum.
val zero = (List.empty[Int], 0)

// sequencer will receive an accumulator and the value.
// The accumulator will be reset for each key to 'zero'.
// In this sequencer we add value to the sum and append to the list because
// we want to keep both.
// This can be thought of as "map" stage in classic map/reduce.
def sequencer(acc: (List[Int], Int), value: Int) = {
val (values, sum) = acc
(value :: values, sum + value)
}

// combiner combines two lists and sums into one.
// The reason for this is the sequencer may run in different partitions
// and thus produce partial results. This step combines those partials into
// one final result.
// This step can be thought of as "reduce" stage in classic map/reduce.
def combiner(left: (List[Int], Int), right: (List[Int], Int)) = {
(left._1 ++ right._1, left._2 + right._2)
}

// wiring it all together.
// Note the type of result it produces:
// Each key will have a list of values which contributed to the sum, sum the sum itself.
val result: RDD[(String, (List[Int], Int))] = table.aggregateByKey(zero)(sequencer, combiner)

// To turn this to a flat list and print, use flatMap to produce:
// (key, value, sum)
val flatResult: RDD[(String, Int, Int)] = result.flatMap(result => {
val (key, (values, sum)) = result
for (value <- values) yield (key, value, sum)
})

// collect and print
flatResult.collect().foreach(println)

这会产生:

(001,1,6)
(001,2,6)
(001,3,6)
(002,0,7)
(002,7,7)

这里还有一个要点,上面有一个完全可运行的版本如果你想引用它:https://gist.github.com/ppanyukov/253d251a16fbb660f225fb425d32206a

关于apache-spark - Spark reduceByKey 以及如何最小化混洗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36455419/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com