gpt4 book ai didi

sql - 以编程方式向 Spark DataFrame 添加多个列

转载 作者:行者123 更新时间:2023-12-05 03:12:44 24 4
gpt4 key购买 nike

我在 scala 中使用 spark。

我有一个包含 3 列的数据框:ID、时间、RawHexdata。我有一个用户定义的函数,它接受 RawHexData 并将其扩展为 X 列。重要的是要说明每一行 X 是相同的(列不变)。但是,在我收到第一个数据之前,我不知道这些列是什么。但是一旦我有了头,我就可以推断出来。

我想要第二个具有所述列的 Dataframe:Id,Time,RawHexData,NewCol1,...,NewCol3。

我能想到的“最简单”的方法是:1.将每一行反序列化为json(这里每个数据类型都是可序列化的)2. 添加我的新专栏,3. 从修改后的 json 反序列化一个新的 dataframe,

然而,这似乎是一种浪费,因为它涉及 2 个昂贵且冗余的 json 序列化步骤。我正在寻找更简洁的模式。

使用案例类,似乎是个坏主意,因为我不知道列数,也不知道列名。

最佳答案

要动态扩展DataFrame,您可以对行RDD 进行操作,您可以通过调用dataFrame.rdd 获取该行。有了 Row 实例,您就可以访问 RawHexdata 列并解析其中包含的数据。通过将新解析的列添加到生成的 Row 中,您几乎已经解决了问题。将 RDD[Row] 转换回 DataFrame 唯一需要做的就是为新列生成架构数据。为此,您可以在驱动程序上收集单个 RawHexdata 值,然后提取列类型。

以下代码说明了这种方法。

object App {

case class Person(name: String, age: Int)

def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[4]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

val input = sc.parallelize(Seq(Person("a", 1), Person("b", 2)))
val dataFrame = input.df

dataFrame.show()

// create the extended rows RDD
val rowRDD = dataFrame.rdd.map{
row =>
val blob = row(1).asInstanceOf[Int]
val newColumns: Seq[Any] = Seq(blob, blob * 2, blob * 3)
Row.fromSeq(row.toSeq.init ++ newColumns)
}

val schema = dataFrame.schema

// we know that the new columns are all integers
val newColumns = StructType{
Seq(new StructField("1", IntegerType), new StructField("2", IntegerType), new StructField("3", IntegerType))
}

val newSchema = StructType(schema.init ++ newColumns)

val newDataFrame = sqlContext.createDataFrame(rowRDD, newSchema)

newDataFrame.show()
}
}

关于sql - 以编程方式向 Spark DataFrame 添加多个列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32580396/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com