gpt4 book ai didi

performance - Spark 中的低性能 reduceByKey()

转载 作者:行者123 更新时间:2023-12-05 08:24:08 28 4
gpt4 key购买 nike

我正在 Spark 上编写一个程序,我只是通过键进行聚合。该程序非常简单。我的输入数据只有 2GB,在设置为 local[2] 的多核服务器(8 核,32GB RAM)上运行。那就是使用两个内核进行并行化。但是,我发现性能很差。几乎需要两个小时才能完成。我正在使用 KryoSerializer。我想这可能是由 Serializer 引起的。如何解决这个问题?

  val dataPoints = SparkContextManager.textFile(dataLocation)
.map(x => {
val delimited = x.split(",")
(delimited(ColumnIndices.HOME_ID_COLUMN).toLong,
delimited(ColumnIndices.USAGE_READING_COLUMN).toDouble)
})

def process(step: Int): Array[(Long, List[Double])] = {
val resultRDD = new PairRDDFunctions(dataPoints.map(x =>(x._1, List[Double](x._2))))
resultRDD.reduceByKey((x, y) => x++y).collect()
}

输出将是:

1, [1, 3, 13, 21, ..., 111] // The size of list is about 4000
2, [24,34,65, 24, ..., 245]
....

最佳答案

看起来您正在尝试编写一个 Spark 作业,该作业将与同一键关联的值组合在一起。 PairRDDFunctions 有一个执行此操作的 groupByKey 操作。 Spark 的 groupByKey 实现利用多项性能优化来创建更少的临时对象并通过网络随机播放更少的数据(因为每个值都不会包装在 List 中)。

如果您导入 Spark 的隐式转换,使用

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

那么您将不需要使用 PairRDDFunctions 手动包装映射的 RDD 以访问 groupByKey 等函数。这不会对性能产生影响,并使大型 Spark 程序更易于阅读。

使用groupByKey,我认为您的process 函数可以重写为

def process(step: Int): Array[(Long, Seq[Double])] = {
dataPoints.groupByKey().collect()
}

我还会考虑增加并行度:groupByKeyreduceByKey 都采用可选的 numTasks 参数来控制 reducer 的数量;默认情况下,Spark 仅对 groupByKeyreduceByKey 使用 8 个并行任务。这在 Spark Scala Programming Guide 中有描述。 ,以及在 Scaladoc .

关于performance - Spark 中的低性能 reduceByKey(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22310110/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com