gpt4 book ai didi

apache-spark - Spark SQL(语言,而非 API)和来自 UDF 的行数据访问

转载 作者:行者123 更新时间:2023-12-04 10:31:49 25 4
gpt4 key购买 nike

我在 Spark SQL 表达式(SQL 语言)中使用自己的 Spark UDF 函数(不是通过 Spark API)。如果我的 UDF 函数内部出现故障,我想访问包含所有列的整行并公开此信息(例如通过自定义异常或日志)以更好地处理错误。

现在我不知道如何访问我的 UDF 中的行列,甚至不知道如何通过 SQL 将所有列传递给我的 UDF。请建议。

最佳答案

您可以使用 struct("*") 将整行作为附加参数传递。 , 或 struct(*)在 SQL 中。例子:

val df = Seq(
(1, Option.empty[String], 20)
).toDF("id", "name", "age")

val myUDF = udf((name: String, row: Row) =>
try {
Some(name.toLowerCase())
} catch {
case e: Exception => println(row.mkString(","))
None
}
)

df
.select(myUDF($"name",struct("*")))
.show()

然后您会在日志中看到该行的内容(在本例中为 1,null,20 )。由于日志在远程机器上,这可能令人沮丧。

更多关于调试/异常处理:您可以通过使用以行的字符串表示形式重新抛出异常作为消息来将异常传播到驱动程序。请注意,如果发生异常,您的作业将失败:
val myUDF = udf((name: String, row: Row) =>
try {
name.toLowerCase()
} catch {
case e: Exception => throw new Exception("row : "+row.mkString(","),e)
}
)

我的首选方案 是从包含错误消息的 UDF 返回一个附加列,这也不会在出现错误时停止 spark 作业:
val myUDF = udf((name: String) => {
val result: (Option[String], Option[String]) = try {
(Option(name.toLowerCase()), None)
} catch {
case e: java.lang.Exception => (None, Option(e.toString()))
}
result
}
)

df
.withColumn("tmp",myUDF($"name"))
.withColumn("udf_result",$"tmp._1")
.withColumn("error",$"tmp._2").drop($"tmp")
.show(false)

+---+----+---+----------+------------------------------+
|id |name|age|udf_result|error |
+---+----+---+----------+------------------------------+
|1 |null|20 |null |java.lang.NullPointerException|
+---+----+---+----------+------------------------------+

像这样,不需要将整行传递给 udf,您可以简单地过滤您的 df df.where($"error".isNotNull)

关于apache-spark - Spark SQL(语言,而非 API)和来自 UDF 的行数据访问,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60391212/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com