gpt4 book ai didi

java - jsontostructs 到 spark 结构化流中的行

转载 作者:塔克拉玛干 更新时间:2023-11-03 05:04:43 27 4
gpt4 key购买 nike

我正在使用 Spark 2.2,我正在尝试从 Kafka 读取 JSON 消息,将它们转换为 DataFrame 并将它们作为 Row:

spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic")
.load()
.select(col("value").cast(StringType).as("col"))
.writeStream()
.format("console")
.start();

有了这个我可以实现:

+--------------------+
| col|
+--------------------+
|{"myField":"somet...|
+--------------------+

我想要更像这样的东西:

+--------------------+
| myField|
+--------------------+
|"something" |
+--------------------+

我尝试使用 struct 来使用 from_json 函数:

DataTypes.createStructType(
new StructField[] {
DataTypes.createStructField("myField", DataTypes.StringType)
}
)

但我只得到:

+--------------------+
| jsontostructs(col)|
+--------------------+
|[something] |
+--------------------+

然后我尝试使用 explode 但我只得到异常说:

cannot resolve 'explode(`col`)' due to data type mismatch: 
input to function explode should be array or map type, not
StructType(StructField(...

知道如何让它发挥作用吗?

最佳答案

您就快完成了,只需选择正确的选项即可。 from_json 返回与模式匹配的 struct 列。如果架构(JSON 表示)如下所示:

{"type":"struct","fields":[{"name":"myField","type":"string","nullable":false,"metadata":{}}]}

你会得到等价于的嵌套对象:

root
|-- jsontostructs(col): struct (nullable = true)
| |-- myField: string (nullable = false)

您可以使用 getField (或getItem)方法来选择特定字段

df.select(from_json(col("col"), schema).getField("myField").alias("myField"));

.* 选择 struct 中的所有顶级字段:

df.select(from_json(col("col"), schema).alias("tmp")).select("tmp.*");

虽然对于单个 string 列,get_json_object应该绰绰有余:

df.select(get_json_object(col("col"), "$.myField"));

关于java - jsontostructs 到 spark 结构化流中的行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46714561/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com