- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在处理这样的表格:
ID f1
001 1
001 2
001 3
002 0
002 7
我想计算相同ID的f1列的总和,并用总和创建一个新列,即:
ID f1 sum_f1
001 1 6
001 2 6
001 3 6
002 0 7
002 7 7
我的解决方案是使用reduceByKey
计算总和,然后将结果与原始表连接:
val table = sc.parallelize(Seq(("001",1),("001",2),("001",3),("002",0),("002",7)))
val sum = table.reduceByKey(_ + _)
val result = table.leftOuterJoin(sum).map{ case (a,(b,c)) => (a, b, c.getOrElse(-1) )}
我得到了正确的结果:
result.collect.foreach(println)
输出:
(002,0,7)
(002,7,7)
(001,1,6)
(001,2,6)
(001,3,6)
问题是代码中有 2 个 shuffle 阶段,一个在 reduceByKey,另一个在 leftOuterJoin,但是如果我在 Hadoop MapReduce 中编写代码,很容易只用 1 个 shuffle 阶段(超过一旦在 reduce 阶段使用了 outputer.collect
函数)。所以我想知道是否有更好的方法来一次洗牌。任何建议将不胜感激。
最佳答案
另一种方法是使用aggregateByKey
。这可能很难理解方法但来自 spark 文档:
(
groupByKey
) Note: This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key, usingPairRDDFunctions.aggregateByKey
orPairRDDFunctions.reduceByKey
will provide much better performance.
此外,aggregateByKey
是一个通用函数,因此值得了解。
当然我们这里不是做“sum之类的简单聚合”所以这种方法与 groupByKey
相比的性能优势可能不存在。显然,在真实数据上对这两种方法进行基准测试是个好主意。
具体实现如下:
// The input as given by OP here: http://stackoverflow.com/questions/36455419/spark-reducebykey-and-keep-other-columns
val table = sc.parallelize(Seq(("001", 1), ("001", 2), ("001", 3), ("002", 0), ("002", 7)))
// zero is initial value into which we will aggregate things.
// The second element is the sum.
// The first element is the list of values which contributed to this sum.
val zero = (List.empty[Int], 0)
// sequencer will receive an accumulator and the value.
// The accumulator will be reset for each key to 'zero'.
// In this sequencer we add value to the sum and append to the list because
// we want to keep both.
// This can be thought of as "map" stage in classic map/reduce.
def sequencer(acc: (List[Int], Int), value: Int) = {
val (values, sum) = acc
(value :: values, sum + value)
}
// combiner combines two lists and sums into one.
// The reason for this is the sequencer may run in different partitions
// and thus produce partial results. This step combines those partials into
// one final result.
// This step can be thought of as "reduce" stage in classic map/reduce.
def combiner(left: (List[Int], Int), right: (List[Int], Int)) = {
(left._1 ++ right._1, left._2 + right._2)
}
// wiring it all together.
// Note the type of result it produces:
// Each key will have a list of values which contributed to the sum, sum the sum itself.
val result: RDD[(String, (List[Int], Int))] = table.aggregateByKey(zero)(sequencer, combiner)
// To turn this to a flat list and print, use flatMap to produce:
// (key, value, sum)
val flatResult: RDD[(String, Int, Int)] = result.flatMap(result => {
val (key, (values, sum)) = result
for (value <- values) yield (key, value, sum)
})
// collect and print
flatResult.collect().foreach(println)
这会产生:
(001,1,6)
(001,2,6)
(001,3,6)
(002,0,7)
(002,7,7)
这里还有一个要点,上面有一个完全可运行的版本如果你想引用它:https://gist.github.com/ppanyukov/253d251a16fbb660f225fb425d32206a
关于apache-spark - Spark reduceByKey 以及如何最小化混洗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36455419/
我是一名优秀的程序员,十分优秀!