gpt4 book ai didi

apache-spark - Spark SQL 将数组扩展为多列

转载 作者:行者123 更新时间:2023-12-04 04:57:39 27 4
gpt4 key购买 nike

我正在为来自 S3 中的 Oracle 源的每一行更新存储 json 消息。json结构如下

{
"tableName": "ORDER",
"action": "UPDATE",
"timeStamp": "2016-09-04 20:05:08.000000",
"uniqueIdentifier": "31200477027942016-09-05 20:05:08.000000",
"columnList": [{
"columnName": "ORDER_NO",
"newValue": "31033045",
"oldValue": ""
}, {
"columnName": "ORDER_TYPE",
"newValue": "N/B",
"oldValue": ""
}]
}

我正在使用 spark sql 根据唯一标识符的最大值查找每个键的最新记录。columnList 是一个包含表列列表的数组。我想连接多个表并获取最新的记录。我如何将一个表的 json 数组中的列与另一个表中的列连接起来。有没有办法将 json 数组分解为多列。例如,上面的 json 会将 ORDER_NO 作为一列,将 ORDER_TYPE 作为另一列。如何根据 columnName 字段创建具有多列的数据框例如:新的 RDD 应该有列 (tableName, action, timeStamp, uniqueIdentifier, ORDER_NO, ORDER_NO)ORDER_NO 和 ORDER_NO 字段的值应该从 json 中的 newValue 字段映射。

最佳答案

通过使用 RDD api 以编程方式创建模式找到了解决方案

  Dataset<Row> dataFrame = spark.read().json(inputPath);
dataFrame.printSchema();
JavaRDD<Row> rdd = dataFrame.toJavaRDD();
SchemaBuilder schemaBuilder = new SchemaBuilder();
// get the schema column names in appended format
String columnNames = schemaBuilder.populateColumnSchema(rdd.first(), dataFrame.columns());

SchemaBuilder 是一个创建的自定义类,它获取 rdd 详细信息并返回分隔符分隔的列名。然后使用 RowFactory.create 调用,将 json 值映射到模式。文档引用 http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema

关于apache-spark - Spark SQL 将数组扩展为多列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41606039/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com