gpt4 book ai didi

scala - Spark AccumulatorParam 通用参数

转载 作者:行者123 更新时间:2023-12-04 19:32:42 26 4
gpt4 key购买 nike

我在 Spark 中使用累加器时遇到问题。正如在 Spark 网站上看到的那样,如果您想要自定义累加器,您可以简单地(使用对象)扩展 AccumulatorParam 特性。问题是我想但不能使该对象通用,例如:

object SeqAccumulatorParam[B] extends AccumulatorParam[Seq[B]] {

override def zero(initialValue: Seq[B]): Seq[B] = Seq[B]()

override def addInPlace(s1: Seq[B], s2: Seq[B]): Seq[B] = s1 ++ s2

}

但这会给我一个编译错误,因为对象不能使用通用参数。我的情况真的不允许我为每个给定类型定义一个SeqAccumulatorParam,因为这会导致大量丑陋的代码重复。

我有另一种方法,只需将所有结果放在 RDD 中,然后使用为该单一类型定义的累加器迭代它们,但这会好得多。

我的问题是:还有其他创建累加器的方法吗?

最佳答案

您可以简单地使用一个类来创建对象,而不是单例对象。

class SeqAccumulatorParam[B] extends AccumulatorParam[Seq[B]] {
override def zero(initialValue: Seq[B]): Seq[B] = Seq[B]()
override def addInPlace(s1: Seq[B], s2: Seq[B]): Seq[B] = s1 ++ s2
}

val seqAccum = sc.accumulator(Seq[Int]())(new SeqAccumulatorParam[Int]())

val lists = (1 to 5).map(x => (0 to x).toList)
sc.parallelize(lists).foreach(x => seqAccum += x)

seqAccum.value
// Seq[Int] = List(0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 0, 1, 2, 3, 0, 1, 2, 0, 1)
// result can be in different order.

// For Doubles.
val seqAccumD = sc.accumulator(Seq[Double]())(new SeqAccumulatorParam[Double]())
sc.parallelize(lists.map(x => x.map(_.toDouble))).foreach(x => seqAccumD += x)

seqAccumD.value
// Seq[Double] = List(0.0, 1.0, 0.0, 1.0, 2.0, 0.0, 1.0, 2.0, 3.0, 0.0, 1.0, 2.0, 3.0, 4.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0)

关于scala - Spark AccumulatorParam 通用参数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26139145/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com