gpt4 book ai didi

scala - Spark SQL - 从 sql 函数生成数组数组

转载 作者:行者123 更新时间:2023-12-04 22:43:59 26 4
gpt4 key购买 nike

我想创建一个数组数组。这是我的数据表:

// A case class for our sample table
case class Testing(name: String, age: Int, salary: Int)

// Create an RDD with some data
val x = sc.parallelize(Array(
Testing(null, 21, 905),
Testing("Noelia", 26, 1130),
Testing("Pilar", 52, 1890),
Testing("Roberto", 31, 1450)
))

// Convert RDD to a DataFrame
val df = sqlContext.createDataFrame(x)

// For SQL usage we need to register the table
df.registerTempTable("df")

我想创建一个整数列“年龄”的数组。为此,我使用“collect_list”:
sqlContext.sql("SELECT collect_list(age) as age from df").show

但是现在我想生成一个包含上面创建的多个数组的数组:
 sqlContext.sql("SELECT collect_list(collect_list(age), collect_list(salary)) as arrayInt from df").show

但这不起作用,或者使用函数 org.apache.spark.sql.functions.array。有任何想法吗?

最佳答案

好吧,事情再简单不过了。让我们考虑您正在处理的相同数据,并从那里一步一步地进行

// A case class for our sample table
case class Testing(name: String, age: Int, salary: Int)

// Create an RDD with some data
val x = sc.parallelize(Array(
Testing(null, 21, 905),
Testing("Noelia", 26, 1130),
Testing("Pilar", 52, 1890),
Testing("Roberto", 31, 1450)
))

// Convert RDD to a DataFrame
val df = sqlContext.createDataFrame(x)

// For SQL usage we need to register the table
df.registerTempTable("df")
sqlContext.sql("select collect_list(age) as age from df").show

// +----------------+
// | age|
// +----------------+
// |[21, 26, 52, 31]|
// +----------------+

sqlContext.sql("select collect_list(collect_list(age), collect_list(salary)) as arrayInt from df").show

正如错误消息所说:
org.apache.spark.sql.AnalysisException: No handler for Hive udf class
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList because: Exactly one argument is expected..; line 1 pos 52 [...]
collest_list只需要一个参数。让我们查看文档 here .

它实际上需要一个参数!但是让我们在函数对象的文档中更进一步。您似乎已经注意到数组函数允许您从 Column 或重复的 Column 参数中创建新的数组列。所以让我们使用它:
sqlContext.sql("select array(collect_list(age), collect_list(salary)) as arrayInt from df").show(false)

数组函数确实从列列表中创建了一个列,由 collect_list 在 age 和salary 上预先创建:
// +-------------------------------------------------------------------+
// |arrayInt |
// +-------------------------------------------------------------------+
// |[WrappedArray(21, 26, 52, 31), WrappedArray(905, 1130, 1890, 1450)]|
// +-------------------------------------------------------------------+

我们从这里去哪里?

您必须记住,来自 DataFrame 的 Row 只是由 Row 包装的另一个集合。

我要做的第一件事就是处理这个集合。那么我们如何展平 WrappedArray[WrappedArray[Int]] ?

Scala 有点神奇,你只需要使用 .flatten
import scala.collection.mutable.WrappedArray

val firstRow: mutable.WrappedArray[mutable.WrappedArray[Int]] =
sqlContext.sql("select array(collect_list(age), collect_list(salary)) as arrayInt from df")
.first.get(0).asInstanceOf[WrappedArray[WrappedArray[Int]]]
// res26: scala.collection.mutable.WrappedArray[scala.collection.mutable.WrappedArray[Int]] =
// WrappedArray(WrappedArray(21, 26, 52, 31), WrappedArray(905, 1130, 1890, 1450))

firstRow.flatten
// res27: scala.collection.mutable.IndexedSeq[Int] = ArrayBuffer(21, 26, 52, 31, 905, 1130, 1890, 1450)

现在让我们将它包装在 UDF 中,以便我们可以在 DataFrame 上使用它:
def flatten(array: WrappedArray[WrappedArray[Int]]) = array.flatten
sqlContext.udf.register("flatten", flatten(_: WrappedArray[WrappedArray[Int]]))

由于我们注册了 UDF,我们现在可以在 sqlContext 中使用它:
sqlContext.sql("select flatten(array(collect_list(age), collect_list(salary))) as arrayInt from df").show(false)

// +---------------------------------------+
// |arrayInt |
// +---------------------------------------+
// |[21, 26, 52, 31, 905, 1130, 1890, 1450]|
// +---------------------------------------+

我希望这有帮助 !

关于scala - Spark SQL - 从 sql 函数生成数组数组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36149608/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com