gpt4 book ai didi

scala - 我想将 Hive 中所有现有的 UDTF 转换为 Scala 函数并从 Spark SQL 使用它

转载 作者:可可西里 更新时间:2023-11-01 15:29:40 27 4
gpt4 key购买 nike

任何人都可以给我一个用 scala 编写的返回多行并将其用作 SparkSQL 中的 UDF 的示例 UDTF(例如;explode)吗?

表:表1

+------+----------+----------+
|userId|someString| varA|
+------+----------+----------+
| 1| example1| [0, 2, 5]|
| 2| example2|[1, 20, 5]|
+------+----------+----------+

我想创建以下 Scala 代码:

def exampleUDTF(var: Seq[Int]) = <Return Type???>  {
// code to explode varA field ???
}

sqlContext.udf.register("exampleUDTF",exampleUDTF _)

sqlContext.sql("FROM table1 SELECT userId, someString, exampleUDTF(varA)").collect().foreach(println)

预期输出:

+------+----------+----+
|userId|someString|varA|
+------+----------+----+
| 1| example1| 0|
| 1| example1| 2|
| 1| example1| 5|
| 2| example2| 1|
| 2| example2| 20|
| 2| example2| 5|
+------+----------+----+

最佳答案

您不能使用 UDF 执行此操作。 UDF 只能将单个列添加到 DataFrame。但是,有一个名为 DataFrame.explode 的函数,您可以改用它。要用你的例子来做,你会这样做:

import org.apache.spark.sql._

val df = Seq(
(1,"example1", Array(0,2,5)),
(2,"example2", Array(1,20,5))
).toDF("userId", "someString", "varA")

val explodedDf = df.explode($"varA"){
case Row(arr: Seq[Int]) => arr.toArray.map(a => Tuple1(a))
}.drop($"varA").withColumnRenamed("_1", "varA")

+------+----------+-----+
|userId|someString| varA|
+------+----------+-----+
| 1| example1| 0|
| 1| example1| 2|
| 1| example1| 5|
| 2| example2| 1|
| 2| example2| 20|
| 2| example2| 5|
+------+----------+-----+

请注意,explode 接受一个函数作为参数。因此,即使您无法创建 UDF 来执行您想要的操作,您也可以创建一个函数来传递给 explode 来执行您想要的操作。像这样:

def exploder(row: Row) : Array[Tuple1[Int]] = {
row match { case Row(arr) => arr.toArray.map(v => Tuple1(v)) }
}

df.explode($"varA")(exploder)

就重新创建 UDTF 而言,这大约是您将获得的最佳结果。

关于scala - 我想将 Hive 中所有现有的 UDTF 转换为 Scala 函数并从 Spark SQL 使用它,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36543374/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com