gpt4 book ai didi

scala - 在 UDF 中使用时 Spark 累加器为空

转载 作者:行者123 更新时间:2023-12-05 07:35:33 26 4
gpt4 key购买 nike

我正在努力优化我的 Spark 进程,并尝试将 UDF 与累加器结合使用。我已经让累加器自行工作,并且正在寻找使用 UDF 是否可以加快速度。但是,当我将累加器包装在 UDF 中时,它仍然是空的。我是不是特别有问题?即使使用我的 .count 它仍然没有执行,延迟执行是否发生了什么?

输入:

0,[0.11,0.22]
1,[0.22,0.33]

输出:

(0,0,0.11),(0,1,0.22),(1,0,0.22),(1,1,0.33)

代码

  val accum = new MapAccumulator2d()
val session = SparkSession.builder().getOrCreate()
session.sparkContext.register(accum)

//Does not work - Empty Accumlator
val rowAccum = udf((itemId: Int, item: mutable.WrappedArray[Float]) => {
val map = item
.zipWithIndex
.map(ff => {
((itemId, ff._2), ff._1.toDouble)
}).toMap
accum.add(map)
itemId
})
dataFrame.select(rowAccum(col("itemId"), col("jaccardList"))).count

//Works
dataFrame.foreach(f => {
val map = f.getAs[mutable.WrappedArray[Float]](1)
.zipWithIndex
.map(ff => {
((f.getInt(0), ff._2), ff._1.toDouble)
}).toMap
accum.add(map)
})

val list = accum.value.toList.map(f => (f._1._1, f._1._2, f._2))

最佳答案

看起来这里唯一的问题是使用 count 来“触发”延迟计算的 UDF:Spark 足够“聪明”,可以意识到 select 操作不能不会更改 count 的结果,因此不会真正执行 UDF。选择不同的操作(例如 collect)表明 UDF 工作并更新了累加器。

这是一个(更简洁的)示例:

val accum = sc.longAccumulator

val rowAccum = udf((itemId: Int) => { accum.add(itemId); itemId })

val dataFrame = Seq(1,2,3,4,5).toDF("itemId")

dataFrame.select(rowAccum(col("itemId"))).count() // won't trigger UDF
println(s"RESULT: ${accum.value}") // prints 0

dataFrame.select(rowAccum(col("itemId"))).collect() // triggers UDF
println(s"RESULT: ${accum.value}") // prints 15

关于scala - 在 UDF 中使用时 Spark 累加器为空,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49500616/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com