gpt4 book ai didi

scala - Spark accumulableCollection 不适用于 mutable.Map

转载 作者:行者123 更新时间:2023-12-04 17:51:25 30 4
gpt4 key购买 nike

我正在使用 Spark 来累积员工记录,为此我使用了 Spark 的累加器。我使用 Map[empId, emp] 作为 accumulableCollection 以便我可以通过他们的 ID 搜索员工。我已经尝试了一切,但它不起作用。有人可以指出我使用 accumulableCollection 的方式是否存在任何逻辑问题,或者不支持 Map。以下是我的代码

package demo

import org.apache.spark.{SparkContext, SparkConf, Logging}

import org.apache.spark.SparkContext._
import scala.collection.mutable


object MapAccuApp extends App with Logging {
case class Employee(id:String, name:String, dept:String)

val conf = new SparkConf().setAppName("Employees") setMaster ("local[4]")
val sc = new SparkContext(conf)

implicit def empMapToSet(empIdToEmp: mutable.Map[String, Employee]): mutable.MutableList[Employee] = {
empIdToEmp.foldLeft(mutable.MutableList[Employee]()) { (l, e) => l += e._2}
}

val empAccu = sc.accumulableCollection[mutable.Map[String, Employee], Employee](mutable.Map[String,Employee]())

val employees = List(
Employee("10001", "Tom", "Eng"),
Employee("10002", "Roger", "Sales"),
Employee("10003", "Rafael", "Sales"),
Employee("10004", "David", "Sales"),
Employee("10005", "Moore", "Sales"),
Employee("10006", "Dawn", "Sales"),
Employee("10007", "Stud", "Marketing"),
Employee("10008", "Brown", "QA")
)

System.out.println("employee count " + employees.size)


sc.parallelize(employees).foreach(e => {
empAccu += e
})

System.out.println("empAccumulator size " + empAccu.value.size)
}

最佳答案

使用 accumulableCollection 似乎对您的问题有点矫枉过正,如下所示:

import org.apache.spark.{AccumulableParam, Accumulable, SparkContext, SparkConf}

import scala.collection.mutable

case class Employee(id:String, name:String, dept:String)

val conf = new SparkConf().setAppName("Employees") setMaster ("local[4]")
val sc = new SparkContext(conf)

implicit def mapAccum =
new AccumulableParam[mutable.Map[String,Employee], Employee]
{
def addInPlace(t1: mutable.Map[String,Employee],
t2: mutable.Map[String,Employee])
: mutable.Map[String,Employee] = {
t1 ++= t2
t1
}
def addAccumulator(t1: mutable.Map[String,Employee], e: Employee)
: mutable.Map[String,Employee] = {
t1 += (e.id -> e)
t1
}
def zero(t: mutable.Map[String,Employee])
: mutable.Map[String,Employee] = {
mutable.Map[String,Employee]()
}
}

val empAccu = sc.accumulable(mutable.Map[String,Employee]())

val employees = List(
Employee("10001", "Tom", "Eng"),
Employee("10002", "Roger", "Sales"),
Employee("10003", "Rafael", "Sales"),
Employee("10004", "David", "Sales"),
Employee("10005", "Moore", "Sales"),
Employee("10006", "Dawn", "Sales"),
Employee("10007", "Stud", "Marketing"),
Employee("10008", "Brown", "QA")
)

System.out.println("employee count " + employees.size)

sc.parallelize(employees).foreach(e => {
empAccu += e
})

println("empAccumulator size " + empAccu.value.size)
empAccu.value.foreach(entry =>
println("emp id = " + entry._1 + " name = " + entry._2.name))

虽然目前对此的记录很少,relevant test在 Spark 代码库中非常有启发性。

编辑:事实证明,使用 accumulableCollection 确实具有值(value):您不需要定义 AccumulableParam 和以下作品。我将保留这两种解决方案,以防它们对人们有用。

case class Employee(id:String, name:String, dept:String)

val conf = new SparkConf().setAppName("Employees") setMaster ("local[4]")
val sc = new SparkContext(conf)

val empAccu = sc.accumulableCollection(mutable.HashMap[String,Employee]())

val employees = List(
Employee("10001", "Tom", "Eng"),
Employee("10002", "Roger", "Sales"),
Employee("10003", "Rafael", "Sales"),
Employee("10004", "David", "Sales"),
Employee("10005", "Moore", "Sales"),
Employee("10006", "Dawn", "Sales"),
Employee("10007", "Stud", "Marketing"),
Employee("10008", "Brown", "QA")
)

System.out.println("employee count " + employees.size)

sc.parallelize(employees).foreach(e => {
// notice this is different from the previous solution
empAccu += e.id -> e
})

println("empAccumulator size " + empAccu.value.size)
empAccu.value.foreach(entry =>
println("emp id = " + entry._1 + " name = " + entry._2.name))

这两种解决方案都使用 Spark 1.0.2 进行了测试。

关于scala - Spark accumulableCollection 不适用于 mutable.Map,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25917476/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com