gpt4 book ai didi

scala - 如何将 Array[Seq[String]] 传递给 apache spark udf? (错误 : Not Applicable)

转载 作者:行者123 更新时间:2023-12-02 03:15:23 25 4
gpt4 key购买 nike

我在 scala 中有以下 apache spark udf:

val myFunc = udf {
(userBias: Float, otherBiases: Map[Long, Float],
userFactors: Seq[Float], context: Seq[String]) =>
var result = Float.NaN

if (userFactors != null) {
var contexBias = 0f

for (cc <- context) {
contexBias += otherBiases(contextMapping(cc))
}

// definition of result
// ...
}
result
}

现在我想将参数传递给这个函数,但是由于参数 context,我总是收到消息 Not Applicable .我知道用户定义的函数按行接收输入,如果我删除 context,这个函数就会运行。 ...如何解决这个问题?为什么它不从 Array[Seq[String]] 中读取行,即来自 context ?或者,传递 context 也是可以接受的作为DataFrame或类似的东西。

// context is Array[Seq[String]]
val a = sc.parallelize(Seq((1,2),(3,4))).toDF("a", "b")
val context = a.collect.map(_.toSeq.map(_.toString))

// userBias("bias"), otherBias("biases") and userFactors("features")
// have a type Column, while userBias... are DataFrames
myDataframe.select(dataset("*"),
myFunc(userBias("bias"),
otherBias("biases"),
userFactors("features"),
context)
.as($(newCol)))

更新:

我尝试了 zero323 的答案中指出的解决方案,但是 context: Array[Seq[String]] 仍然存在一个小问题.特别是问题在于遍历此数组 for (cc <- context) { contexBias += otherBiases(contextMapping(cc)) } .我应该将一个字符串传递给 contextMapping , 不是 Seq[String] :

  def myFunc(context: Array[Seq[String]]) = udf {
(userBias: Float, otherBiases: Map[Long, Float],
userFactors: Seq[Float]) =>
var result = Float.NaN

if (userFactors != null) {
var contexBias = 0f
for (cc <- context) {
contexBias += otherBiases(contextMapping(cc))
}

// estimation of result

}
result
}

现在我这样调用它:

myDataframe.select(dataset("*"),
myFunc(context)(userBias("bias"),
otherBias("biases"),
userFactors("features"))
.as($(newCol)))

最佳答案

Spark 2.2+

您可以使用 typedLit 函数:

import org.apache.spark.sql.functions.typedLit

myFunc(..., typedLit(context))

Spark < 2.2

任何直接传递给 UDF 的参数都必须是 Column,所以如果你想传递常量数组,你必须将它转换为列文字:

import org.apache.spark.sql.functions.{array, lit}

val myFunc: org.apache.spark.sql.UserDefinedFunction = ???

myFunc(
userBias("bias"),
otherBias("biases"),
userFactors("features"),
// org.apache.spark.sql.Column
array(context.map(xs => array(xs.map(lit _): _*)): _*)
)

Column 对象只能使用闭包间接传递,例如:

def myFunc(context: Array[Seq[String]]) = udf {
(userBias: Float, otherBiases: Map[Long, Float], userFactors: Seq[Float]) =>
???
}

myFunc(context)(userBias("bias"), otherBias("biases"), userFactors("features"))

关于scala - 如何将 Array[Seq[String]] 传递给 apache spark udf? (错误 : Not Applicable),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37330966/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com