gpt4 book ai didi

scala - 如何在Spark SQL中将额外的参数传递给UDF?

转载 作者:行者123 更新时间:2023-12-03 14:30:14 27 4
gpt4 key购买 nike

我想解析DataFrame中的日期列,并且对于每个日期列,日期的分辨率可能会发生变化(即如果分辨率设置为“月”,则为2011/01/10 => 2011/01)。

我写了以下代码:

def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =
{
import org.apache.spark.sql.functions._
val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)}
val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)}

val allColNames = dataframe.columns
val allCols = allColNames.map(name => dataframe.col(name))

val mappedCols =
{
for(i <- allCols.indices) yield
{
schema(i) match
{
case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
case _ => allCols(i)
}
}
}

dataframe.select(mappedCols:_*)

}}

但是,它不起作用。看来我只能将 Column传递给UDF。而且我想知道如果将 DataFrame转换为 RDD并将函数应用于每一行是否会非常慢。

有谁知道正确的解决方案?谢谢!

最佳答案

只需使用一些小技巧:

def convertDateFunc(resolution: DateResolutionType) = udf((x:String) => 
SparkDateTimeConverter.convertDate(x, resolution))

并如下使用它:
case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i))

附带说明一下,您应该看看 sql.functions.truncsql.functions.date_format。这些应该至少是工作的一部分,而根本不使用UDF。

注意:

在Spark 2.2或更高版本中,您可以使用 typedLit函数:
import org.apache.spark.sql.functions.typedLit

支持更广泛的文字,例如 SeqMap

关于scala - 如何在Spark SQL中将额外的参数传递给UDF?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35546576/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com