gpt4 book ai didi

scala - 当 DF 有太多列时,Spark UDF 每条记录调用一次以上

转载 作者:行者123 更新时间:2023-12-03 23:33:28 24 4
gpt4 key购买 nike

我正在使用 Spark 1.6.1 并遇到一个奇怪的行为:我在包含一些输入数据的数据帧上运行带有一些繁重计算(物理模拟)的 UDF,并构建了一个包含许多列的结果数据帧(~40 )。

奇怪的是,在这种情况下,我的 UDF 被我的输入数据帧的每个记录调用不止一次(经常是 1.6 倍),我认为这是 Not Acceptable ,因为它非常昂贵。如果我减少列数(例如减少到 20),那么这种行为就会消失。

我设法写下了一个小脚本来演示这一点:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf


object Demo {

case class Result(a: Double)

def main(args: Array[String]): Unit = {

val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[*]"))
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

val numRuns = sc.accumulator(0) // to count the number of udf calls

val myUdf = udf((i:Int) => {numRuns.add(1);Result(i.toDouble)})

val data = sc.parallelize((1 to 100), numSlices = 5).toDF("id")

// get results of UDF
var results = data
.withColumn("tmp", myUdf($"id"))
.withColumn("result", $"tmp.a")


// add many columns to dataframe (must depend on the UDF's result)
for (i <- 1 to 42) {
results=results.withColumn(s"col_$i",$"result")
}

// trigger action
val res = results.collect()
println(res.size) // prints 100

println(numRuns.value) // prints 160

}
}

现在,有没有办法在不减少列数的情况下解决这个问题?

最佳答案

我无法真正解释这种行为 - 但显然查询计划以某种方式选择了一些记录被计算两次的路径。这意味着如果我们 缓存 中间结果(在应用 UDF 之后)我们可能能够“强制”Spark 不要重新计算 UDF。事实上,一旦添加了缓存,它的行为就会像预期的那样——UDF 被准确地调用了 100 次:

// get results of UDF
var results = data
.withColumn("tmp", myUdf($"id"))
.withColumn("result", $"tmp.a").cache()

当然,缓存有其自身的成本(内存......),但如果它节省了许多 UDF 调用,它最终可能对您有利。

关于scala - 当 DF 有太多列时,Spark UDF 每条记录调用一次以上,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40320563/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com