gpt4 book ai didi

java - 结构化流将 JSON 保存到 HDFS

转载 作者:行者123 更新时间:2023-12-02 02:30:37 25 4
gpt4 key购买 nike

我的Structured Spark Streaming程序是从Kafka读取JSON数据并以JSON格式写入HDFS。我可以将 JSON 保存到 HDFS 但是它保存 JSON 字符串:

 "jsontostructs(CAST(value AS STRING))"
key as below: {"jsontostructs(CAST(value AS STRING))":{"age":42,"name":"John"}}.

如何仅保存

{"age":42,"name":"John"}?




StructType schema = kafkaPrimerRow.schema();

//Read json from kafka. JSON is: {"age":42,"name":"John"}
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", input_bootstrap_server)
.option("subscribe", topics[0])
.load();




//Save Stream to HDFS
StreamingQuery ds = df
.select(functions.from_json(col("value").cast(DataTypes.StringType),schema))
.writeStream()

.format("json")
.outputMode(OutputMode.Append())
.option("path", destPath)
.option("checkpointLocation", checkpoint)
.start();

最佳答案

下面的 .select("data.*") 就成功了。

StreamingQuery ds = df
.select(functions.from_json(col("value").cast(DataTypes.StringType),schema).as("data"))
.select("data.*")
.writeStream()
.format("json")
.outputMode(OutputMode.Append())
.option("path", destPath)
.option("checkpointLocation", checkpoint)
.start();

关于java - 结构化流将 JSON 保存到 HDFS,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57231893/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com