gpt4 book ai didi

apache-spark - 结构化流并将嵌套数据拆分为多个数据集

转载 作者:行者123 更新时间:2023-12-02 12:10:59 25 4
gpt4 key购买 nike

我正在使用 Spark 的结构化流 (2.2.1),使用 Kafka 每 60 秒从传感器接收数据。我在思考如何打包这些 Kafka 数据以便能够正确处理它时遇到了麻烦。

当数据通过 Kafka 传入时,我需要能够进行一些计算。

我的问题是将来自 Kafka 的 JSON 数据解压到我可以使用的数据集中

数据

简化的数据如下所示:

{
id: 1,
timestamp: "timestamp"
pump: {
current: 1.0,
flow: 20.0
torque: 5.0
},
reactors: [
{
id: 1,
status: 200,
},

{
id: 2,
status: 300,
}
],
settings: {
pumpTimer: 20.0,
reactorStatusTimer: 200.0
}
}

为了能够在 Spark 中使用,我为其中的每一个创建了一些案例类结构:

// First, general package
case class RawData(id: String, timestamp: String, pump: String, reactors: Array[String], settings: String)
// Each of the objects from the data
case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: Int)
case class Settings(oos: Boolean, pumpTimer: Float, reactorStatusTimer: Float)

并使用以下方式生成架构:

val rawDataSchema = Encoders.product[RawData].schema

原始数据到 Spark Schema

首先,我将 Kafka 中的“值”字段放入我的通用架构中:

val rawDataSet = df.select($"value" cast "string" as "json")
.select(from_json($"json", rawDataSchema))
.select("data.*").as[RawData]

使用这个 rawDataSet,我可以将每个单独的对象打包到数据集中。

val pump = rawDataSet.select(from_json($"pump", pumpSchema) as 'pumpData)
.select("pumpData.*").as[Pump]

val settings = rawDataSet.select(from_json($"settings", settingsSchema) as 'settingsData)
.select("settingsData.*").as[Settings]

这为我提供了每个 JSON 对象漂亮且干净的数据集。

处理数据

这是我的问题,例如,如果我想比较或计算设置和泵的两个数据集之间的某些值,则 JOIN 无法使用结构化流处理。

val joinedData = pump.join(settings)

错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;

我的做法是错误的吗?或者有什么建议可以替代方法来处理这个问题吗?

谢谢

最佳答案

我将用我现在工作的解决方案回答我自己的问题

我可以将这些对象作为一个带有嵌套对象的案例类连接在一起,而不是为 JSON 中的每个对象创建案例类:

case class RawData(
id: String,
timestamp: String,
pump: Pump,
reactors: Array[Reactor],
settings: Settings
)

case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: Int)
case class Settings(oos: Boolean, pumpTimer: Float, reactorStatusTimer: Float)

为了将其变成可用的数据集,我可以简单地调用

val rawDataset = df.select($"value" cast "string" as "json")
.select(from_json($"json", Encoders.product[RawData].schema) as 'data)
.select("data.*").as[RawData]
.withColumn("reactor", explode($"reactors")) // Handles the array of reactors, making one row in the dataset per reactor.

处理完 JSON 并将其放入我的定义模式后,我可以像这样选择每个特定的传感器:

val tester = rawDataset.select($"pump.current", $”settings.pumpTimer”)

谢谢user6910411为我指明了正确的方向

关于apache-spark - 结构化流并将嵌套数据拆分为多个数据集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49597331/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com