gpt4 book ai didi

scala - 如何在 Spark 中将 Dataframe 的 String 列转换为 Struct

转载 作者:行者123 更新时间:2023-12-05 02:58:05 34 4
gpt4 key购买 nike

我目前正在使用 Structured Streaming 来消费来自 Kafka 的消息

此消息的原始格式具有以下模式结构

root
|-- incidentMessage: struct (nullable = true)
| |-- AssignedUnitEvent: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- CallNumber: string (nullable = true)
| | | |-- Code: string (nullable = true)
| | | |-- EventDateTime: string (nullable = true)
| | | |-- EventDispatcherID: string (nullable = true)
| | | |-- ID: string (nullable = true)
| | | |-- Notes: string (nullable = true)
| | | |-- PhoneNumberCalled: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- SubCallNumber: string (nullable = true)
| | | |-- SupItemNumber: string (nullable = true)
| | | |-- Type: string (nullable = true)
| | | |-- UnitID: string (nullable = true)
|-- preamble: struct (nullable = true)
| |-- gateway: string (nullable = true)
| |-- product: string (nullable = true)
| |-- psap: string (nullable = true)
| |-- refDataVersion: long (nullable = true)
| |-- source: string (nullable = true)
| |-- timestamp: string (nullable = true)
| |-- uuid: string (nullable = true)
| |-- vendor: string (nullable = true)
| |-- version: string (nullable = true)
|-- raw: string (nullable = true)

但是我在定义消息的模式时出错(在流组件中),我写了将所有根列转换为字符串的代码。

这是我写的代码

//Define the schema

val schema1 = new StructType().add("preamble",DataTypes.StringType).add("incidentMessage",DataTypes.StringType).add("raw",DataTypes.StringType)

//Apply the schema to the message (payload)

val finalResult = Df.withColumn("FinalFrame",from_json($"payload",schema1)).select($"FinalFrame.*")

现在我的数据框看起来像这样

scala> finalResult.printSchema
root
|-- incidentMessage: string (nullable = true)
|-- preamble: string (nullable = true)
|-- raw: string (nullable = true)

我现在有大量具有不正确架构的消息。我已尝试将正确的架构应用于我现在拥有的消息,但是写入文件系统的消息集具有可变架构(嵌套列中有变化incidentMessage) 并且这种方法不起作用(我搞砸了,应该使用 Avro)

有没有办法恢复这些数据并使其格式正确?

最佳答案

虽然创建只有 1 个字段的结构没有多大意义,但您可以使用 struct 函数来实现:

import org.apache.spark.sql.functions.struct

df.withColumn("incidentMessage",struct($"incidentMessage"))

关于scala - 如何在 Spark 中将 Dataframe 的 String 列转换为 Struct,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59369985/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com