gpt4 book ai didi

json - 使用带有案例类和列名别名的反射的 Spark Dataframe 模式定义

转载 作者:行者123 更新时间:2023-12-04 19:46:49 27 4
gpt4 key购买 nike

我的 Spark Scala 脚本遇到了一个小问题。基本上我有原始数据,我在分组和计数等之后和之后进行聚合我想将输出保存为特定的 JSON 格式。

编辑:

我试图简化问题并重写它:

当我使用 Array[org.apache.spark.sql.Column] 从源数据框中选择数据时,其中列名具有别名,然后使用列名(或索引)作为变量尝试将行映射到案例类时,出现“任务不可序列化”异常。

var dm = sqlContext.createDataFrame(Seq((1,"James"),(2,"Anna"))).toDF("id", "name")

val cl = dm.columns
val cl2 = cl.map(name => col(name).as(name.capitalize))
val dm2 = dm.select(cl2:_*)
val n = "Name"
case class Result(Name:String)
val r = dm2.map(row => Result(row.getAs(n))).toDF

第二部分或问题,我实际上需要最终架构是这些 Result 类对象的数组。我还没有弄清楚,如何做到这一点。预期的结果应该有这样的模式:

    case class Test(var FilteredStatistics: Array[Result])
val t = Test(Array(Result("Anna"), Result("James")))

val t2 = sc.parallelize(Seq(t)).toDF

scala> t2.printSchema
root
|-- FilteredStatistics: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Name: string (nullable = true)

长话短说:

  1. 当数据框列具有别名且变量用于列名时,如何将数据框行映射到案例类对象?

  2. 如何将这些case类对象添加到一个数组中?

最佳答案

序列化问题:这里的问题是 val n = "Name":它在传递给 RDD 转换 (dm2. map(...)),这使得 Spark 关闭该变量和包含它的范围,其中还包括类型为 cl2 Array[Column],因此它不可序列化。

解决方案很简单 - 内联 n(获取 dm2.map(row => Result(row.getAs("Name")))),或者将它放在可序列化的上下文中(不包含任何不可序列化成员的对象或类)。

关于json - 使用带有案例类和列名别名的反射的 Spark Dataframe 模式定义,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41245227/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com