gpt4 book ai didi

Scala和Spark UDF功能

转载 作者:行者123 更新时间:2023-12-04 03:19:39 25 4
gpt4 key购买 nike

我制作了一个简单的UDF,以在spark.temptabl中的时间字段中转换或提取一些值。我注册了该函数,但是当我使用sql调用该函数时,它将引发NullPointerException。下面是我的功能和执行过程。我正在使用齐柏林飞艇。遗憾的是,这在昨天仍然有效,但是今天早上停止了工作。

功能

def convert( time:String ) : String = {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
return sdf.format(time1)
}

注册功能
sqlContext.udf.register("convert",convert _)

在不使用SQL的情况下测试函数-可行
convert(12:12:12) -> returns 12:12

在Zeppelin中使用SQL测试该功能失败。
%sql
select convert(time) from temptable limit 10

临时表的结构
root
|-- date: string (nullable = true)
|-- time: string (nullable = true)
|-- serverip: string (nullable = true)
|-- request: string (nullable = true)
|-- resource: string (nullable = true)
|-- protocol: integer (nullable = true)
|-- sourceip: string (nullable = true)

我正在获取的堆栈跟踪的一部分。
java.lang.NullPointerException
at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643)
at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652)
at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54)
at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44)

最佳答案

使用udf代替直接定义函数

import org.apache.spark.sql.functions._

val convert = udf[String, String](time => {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
sdf.format(time1)
}
)

udf的输入参数是Column(或多个Columns)。返回类型为Column。
case class UserDefinedFunction protected[sql] (
f: AnyRef,
dataType: DataType,
inputTypes: Option[Seq[DataType]]) {

def apply(exprs: Column*): Column = {
Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil)))
}
}

关于Scala和Spark UDF功能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38633216/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com