gpt4 book ai didi

scala - 使用Spark结构化流处理包含嵌套实体的JSON

转载 作者:行者123 更新时间:2023-12-01 03:06:45 25 4
gpt4 key购买 nike

我想使用Spark结构化流从Kafka主题源中读取嵌套数据。
我的Scala代码(案例类和Spark处理代码):

case class Nested(attr_int: Integer, attr_string: String, attr_float: Float, attr_timestamp: java.sql.Timestamp)

case class Parent(a_str: String, a_long: Long, a_nested: Array[Nested])


import org.apache.spark.sql.Encoders
val jsonSchema = Encoders.product[Parent].schema

val df = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "testnested")
.option("group.id", "testnested")
.option("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer")
.option("value.deserializer", "org.apache.kafka.common.serialization.StringSerializer")
.load()
.select($"value" cast "string" as "json")
.select(from_json($"json", jsonSchema) as "data")
.select("data.*")
.withColumn("nested", explode($"a_nested"))
.select("nested.*")
.as[Nested]
.writeStream
.format("console")
.start()
.awaitTermination()


当我发送给Kafka数据时:

{"a_str":"Str","a_long":100,"a_nested":[{"attr_int":0,"attr_string":"nested_0","attr_float":0.0,"attr_timestamp":"2018-01-01T11:00:00.123321+02:00"},{"attr_int":1,"attr_string":"nested_1","attr_float":1.0,"attr_timestamp":"2018-02-02T12:01:01.023321+02:00"}]}


我得到结果:

+--------+-----------+----------+--------------------+
|attr_int|attr_string|attr_float| attr_timestamp|
+--------+-----------+----------+--------------------+
| 0| nested_0| 0.0|2018-01-01 13:02:...|
| 1| nested_1| 1.0|2018-02-02 14:01:...|
+--------+-----------+----------+--------------------+


现在,我想将每个嵌套项目加入父数据,例如:

+--------+-----------+----------+--------------------+-------+--------+
|attr_int|attr_string|attr_float| attr_timestamp| a_str | a_long |
+--------+-----------+----------+--------------------+-------+--------+
| 0| nested_0| 0.0|2018-01-01 13:02:...| Str | 100 |
| 1| nested_1| 1.0|2018-02-02 14:01:...| Str | 100 |
+--------+-----------+----------+--------------------+-------+--------+


请注意, "a_str""a_long"是父实体 "Parent"的列。
由于我不是Spark结构化流处理专家,因此我想知道最“惯用”的方法是什么?
目前,我有一些假设:


创建自定义Kafka值反序列化器
在结构化流上写一些连接(我坚持使用),但是我想这将需要更改json结构(例如,在嵌套中指定一些键值)
指向父数据)
编写自定义方法,该方法将为联接的实体返回非规范化数据,并在此方法中使用 flatMap


请指教。

谢谢

更新1:为方便起见,我在GitHub上创建了一个通讯项目: https://github.com/lospejos/spark-nested-classes-from-json

最佳答案

感谢Glennie Helles Sindholt和其他Google员工的利益:

.select($"nested.*", $"a_str", $"a_long")


Github存储库也已更新。

关于scala - 使用Spark结构化流处理包含嵌套实体的JSON,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53168209/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com