gpt4 book ai didi

scala - 具有非列参数的 Spark udf

转载 作者:行者123 更新时间:2023-12-05 08:27:27 24 4
gpt4 key购买 nike

我想将变量而不是列传递给 spark 中的 UDF。

map 格式如下 Spark dataframe to nested map

val joinUDF = udf((replacementLookup: Map[String, Double], newValue: String) => {
replacementLookup.get(newValue) match {
case Some(tt) => tt
case None => 0.0
}
})

应该像这样映射

(columnsMap).foldLeft(df) {
(currentDF, colName) =>
{
println(colName._1)
println(colName._2)
currentDF
.withColumn("myColumn_" + colName._1, joinUDF(colName._2, col(colName._1)))
}
}

但是抛出

type mismatch;
[error] found : Map
[error] required: org.apache.spark.sql.Column
[error] .withColumn("myColumn_" + colName._1, joinUDF(colName._2, col(colName._1)))

最佳答案

你可以使用柯里化(Currying):

import org.apache.spark.sql.functions._
val df = Seq(("a", 1), ("b", 2)).toDF("StringColumn", "IntColumn")

def joinUDF(replacementLookup: Map[String, Double]) = udf((newValue: String) => {
replacementLookup.get(newValue) match {
case Some(tt) => tt
case None => 0.0
}
})

val myMap = Map("a" -> 1.5, "b" -> 3.0)

df.select(joinUDF(myMap)($"StringColumn")).show()

此外,您可以尝试使用广播变量:

import org.apache.spark.sql.functions._
val df = Seq(("a", 1), ("b", 2)).toDF("StringColumn", "IntColumn")

val myMap = Map("a" -> 1.5, "b" -> 3.0)
val broadcastedMap = sc.broadcast(myMap)

def joinUDF = udf((newValue: String) => {
broadcastedMap.value.get(newValue) match {
case Some(tt) => tt
case None => 0.0
}
})

df.select(joinUDF($"StringColumn")).show()

关于scala - 具有非列参数的 Spark udf,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41242966/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com