gpt4 book ai didi

scala - 如何传递 Scala UserDefinedFunction 其中输出是复杂类型(使用 StructType 和 StructField)以从 Pyspark 使用

转载 作者:行者123 更新时间:2023-12-03 23:43:38 28 4
gpt4 key购买 nike

所以,我想创建一个可以在 Pyspark 中使用的 Scala UDF。
我想要的是接受字符串列表为 x和字符串列表 y
获取所有字符串组合
所以如果我有 x = ["a","b] 和 y=["A","B"] 我希望输出是 out = [[a,A],[a,B],[b, A],[b,B]]
我设法编写的 Scala 代码很简单

(x: Seq[String], y: Seq[String]) => {for (a <- x; b <-y) yield (a,b)}
我创建了一个这样做的 Scala UDF。它适用于 Scala Spark。
我的问题是试图从 pyspark 中调用它。
为了做到这一点,我已经这样做了:
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.UserDefinedFunction

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.api.java.UDF2
import org.apache.spark.sql.api.java.UDF1

class DualArrayExplode extends UDF2[Seq[String], Seq[String], UserDefinedFunction] {
override def call(x: Seq[String], y: Seq[String]):UserDefinedFunction = {
// (worker node stuff)

val DualArrayExplode = (x: Seq[String], y: Seq[String]) => {for (a <- x; b <-y) yield (a,b)}
val DualArrayExplodeUDF = (udf(DualArrayExplode))

return DualArrayExplodeUDF

}
}

object DualArrayExplode {
def apply(): DualArrayExplode = {
new DualArrayExplode()
}
}
我创建了一个包含此代码和其他功能的 jar(可以正常工作)
这段代码编译没有问题。
我在 scala spark 中使用它时的输出列类型是 Array(ArrayType(StructType(StructField(_1,StringType,true), StructField(_2,StringType,true)),true))我的问题是我无法让它与 Pyspark 一起使用。注册此函数时无法定义正确的返回类型。
这是我尝试注册 UDF 的方法
spark.udf.registerJavaFunction('DualArrayExplode', 
'blah.blah.blah.blah.blah.DualArrayExplode', <WHAT_TYPE_HERE???>)
返回类型是可选的,但如果我省略它,那么结果是 [] (一个空列表)
那么......我如何在pyspark中实际使用这个scala UDF?
我意识到可能有很多事情出错,因此尝试尽可能地描述整个设置。

最佳答案

声明DualArrayExplode

class DualArrayExplode extends UDF2[Seq[String], Seq[String], UserDefinedFunction]
这意味着声明了一个 udf,它接受两个字符串序列作为输入并返回一个 udf。这个应该改成
class DualArrayExplode extends UDF2[Seq[String], Seq[String], Seq[(String,String)]] {
override def call(x: Seq[String], y: Seq[String]): Seq[(String,String)]= {
// (worker node stuff)
for (a <- x; b <-y) yield (a,b)
}
}
udf 的返回类型已更改为字符串元组序列。
这个 udf 现在可以在 Pyspark 中注册
from pyspark.sql import types as T
rt = T.ArrayType(T.StructType([T.StructField("_1",T.StringType()),
T.StructField("_2",T.StringType())]))
spark.udf.registerJavaFunction(name='DualArrayExplode',
javaClassName='blah.blah.DualArrayExplode', returnType=rt)

关于scala - 如何传递 Scala UserDefinedFunction 其中输出是复杂类型(使用 StructType 和 StructField)以从 Pyspark 使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64276458/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com