gpt4 book ai didi

json - Spark sql 的 from_json 返回空值

转载 作者:行者123 更新时间:2023-12-04 01:29:38 27 4
gpt4 key购买 nike

我将 Parquet 文件加载到 spark 数据框中,如下所示:

val message= spark.read.parquet("gs://defenault-zdtt-devde/pubsub/part-00001-e9f8c58f-7de0-4537-a7be-a9a8556sede04a-c000.snappy.parquet")

当我对我的数据框执行收集时,我得到以下结果:
message.collect()

Array[org.apache.spark.sql.Row] = Array([118738748835150,2018-08-20T17:44:38.742Z,{"id":"uplink-3130-85bc","device_id":60517119992794222,"group_id":69,"group":"box-2478-2555","profile_id":3,"profile":"eolane-movee","type":"uplink","timestamp":"2018-08-20T17:44:37.048Z","count":3130,"payload":[{"timestamp":"2018-08-20T17:44:37.048Z","data":{"battery":3.5975599999999996,"temperature":27}}],"payload_encrypted":"9da25e36","payload_cleartext":"fe1b01aa","device_properties":{"appeui":"7ca97df000001190","deveui":"7ca97d0000001bb0","external_id":"Product: 3.7 / HW: 3.1 / SW: 1.8.8","no_de_serie_eolane":"4904","no_emballage":"S02066","product_version":"1.3.1"},"protocol_data":{"AppNonce":"e820ef","DevAddr":"0e6c5fda","DevNonce":"85bc","NetID":"000007","best_gateway_id":"M40246","gateway.

这个数据框的架构是
message.printSchema()
root


|-- Id: string (nullable = true)
|-- publishTime: string (nullable = true)
|-- data: string (nullable = true)

我的目标是处理包含 json 数据的数据列并将其展平。
我写了以下代码
val schemaTotal = new StructType (
Array (StructField("id",StringType,false),StructField("device_id",StringType),StructField("group_id",LongType), StructField("group",StringType),StructField("profile_id",IntegerType),StructField("profile",StringType),StructField("type",StringType),StructField("timestamp",StringType),
StructField("count",StringType),
StructField("payload",new StructType ()
.add("timestamp",StringType)
.add("data",new ArrayType (new StructType().add("battery",LongType).add("temperature",LongType),false))),
StructField("payload_encrypted",StringType),
StructField("payload_cleartext",StringType),
StructField("device_properties", new ArrayType (new StructType().add("appeui",StringType).add("deveui",StringType).add("external_id",StringType).add("no_de_serie_eolane",LongType).add("no_emballage",StringType).add("product_version",StringType),false)),
StructField("protocol_data", new ArrayType (new StructType().add("AppNonce",StringType).add("DevAddr",StringType).add("DevNonce",StringType).add("NetID",LongType).add("best_gateway_id",StringType).add("gateways",IntegerType).add("lora_version",IntegerType).add("noise",LongType).add("port",IntegerType).add("rssi",DoubleType).add("sf",IntegerType).add("signal",DoubleType).add("snr",DoubleType),false)),
StructField("lat",StringType),
StructField("lng",StringType),
StructField("geolocation_type",StringType),
StructField("geolocation_precision",StringType),
StructField("delivered_at",StringType)))


val dataframe_extract=message.select($"Id",
$"publishTime",
from_json($"data",schemaTotal).as("content"))

val table = dataframe_extract.select(
$"Id",
$"publishTime",
$"content.id" as "id",
$"content.device_id" as "device_id",
$"content.group_id" as "group_id",
$"content.group" as "group",
$"content.profile_id" as "profile_id",
$"content.profile" as "profile",
$"content.type" as "type",
$"content.timestamp" as "timestamp",
$"content.count" as "count",
$"content.payload.timestamp" as "timestamp2",
$"content.payload.data.battery" as "battery",
$"content.payload.data.temperature" as "temperature",
$"content.payload_encrypted" as "payload_encrypted",
$"content.payload_cleartext" as "payload_cleartext",
$"content.device_properties.appeui" as "appeui"
)
table.show()给我所有列的空值:
    +---------------+--------------------+----+---------+--------+-----+----------+-------+----+---------+-----+----------+-------+-----------+-----------------+-----------------+------+
| Id| publishTime| id|device_id|group_id|group|profile_id|profile|type|timestamp|count|timestamp2|battery|temperature|payload_encrypted|payload_cleartext|appeui|
+---------------+--------------------+----+---------+--------+-----+----------+-------+----+---------+-----+----------+-------+-----------+-----------------+-----------------+------+
|118738748835150|2018-08-20T17:44:...|null| null| null| null| null| null|null| null| null| null| null| null| null| null| null|
+---------------+--------------------+----+---------+--------+-----+----------+-------+----+---------+-----+----------+-------+-----------+-----------------+-----------------+------+

, 而 table.printSchema()给了我预期的结果,请知道如何解决这个问题?
我正在使用 Zeppelin 作为原型(prototype)设计的第一步,非常感谢您的帮助。
最好的祝福

最佳答案

from_json() SQL 函数具有以下约束,可将列值转换为数据框。

  • 无论您在架构中定义的数据类型应与 json 中存在的值匹配,如果有任何列的不匹配值导致所有列值中的 null

  • 例如。:
    '{"name": "raj", "age": 12}'对于此列值

    StructType(List(StructField(name,StringType,true),StructField(age,StringType,true)))



    上面的模式将在两列上返回一个空值

    StructType(List(StructField(name,StringType,true),StructField(age,IntegerType,true)))



    上述架构将返回您预期的数据框

    对于这个线程,可能的原因可能是这样,如果存在任何不匹配的列值,from_json 会将所有列值返回为 null

    关于json - Spark sql 的 from_json 返回空值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51948749/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com