gpt4 book ai didi

scala - 统计Spark中UDF的调用次数

转载 作者:行者123 更新时间:2023-12-03 04:44:59 26 4
gpt4 key购买 nike

使用Spark 1.6.1我想调用UDF被调用的次数。我想这样做是因为我有一个非常昂贵的 UDF(每次调用约 1 秒),并且我怀疑 UDF 的调用频率比我的数据帧中的记录数更频繁,这使得我的 Spark 作业速度比必要的慢 .

虽然我无法重现这种情况,但我想出了一个简单的示例,表明对 UDF 的调用次数似乎与行数不同(此处:更少),这是怎么回事?

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf

object Demo extends App {
val conf = new SparkConf().setMaster("local[4]").setAppName("Demo")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._


val callCounter = sc.accumulator(0)

val df= sc.parallelize(1 to 10000,numSlices = 100).toDF("value")

println(df.count) // gives 10000

val myudf = udf((d:Int) => {callCounter.add(1);d})

val res = df.withColumn("result",myudf($"value")).cache

println(res.select($"result").collect().size) // gives 10000
println(callCounter.value) // gives 9941

}

如果使用累加器不是调用 UDF 计数的正确方法,我还能怎么做?

注意:在我实际的 Spark-Job 中,得到的调用计数大约是实际记录数的 1.7 倍。

最佳答案

Spark 应用程序应该定义 main() 方法,而不是扩展 scala.App。 scala.App 的子类可能无法正常工作。

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf

object Demo extends App {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]")
val sc = new SparkContext(conf)
// [...]
}
}

这应该可以解决您的问题。

关于scala - 统计Spark中UDF的调用次数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40316417/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com