gpt4 book ai didi

mongodb - 在scala中将dataframe转换为json

转载 作者:行者123 更新时间:2023-12-03 15:59:43 25 4
gpt4 key购买 nike

假设我有一个 wordcount 示例,其中我在一列中获取一个 dataframe 作为 word,在另一列中获取 wordcount,我想收集相同的数据并将其存储为 mongo 集合中的 json 数组。

eg for dataframe:
|Word | Count |
| abc | 1 |
| xyz | 23 |

我应该得到像这样的json:

{words:[{word:"abc",count:1},{word:"xyz",count:23}]}

当我在数据帧上尝试 .toJSON 并将值作为列表收集并将其添加到数据帧中时,存储在我的 mongo 中的结果是字符串集合而不是 JSON 集合。

使用的查询:

explodedWords1.toJSON.toDF("words").agg(collect_list("words")).toDF("words")

result : "{\"words\":[{\"word\":\"abc\",\"count\":1},{\"word\":\"xyz\",\"count\":23}]}"

我是 Scala 新手。任何帮助都会很好。 (如果未使用外部包,将会很有帮助)。

最佳答案

将数据帧中的数据存储到 Mongo 中的绝对最佳方法是使用MongoDB Spark 连接器 (https://docs.mongodb.com/spark-connector/master/)

只需将 "org.mongodb.spark"%% "mongo-spark-connector"% "2.2.0" 添加到您的 sbt 依赖项并检查下面的代码

import com.mongodb.spark.MongoSpark
import org.apache.spark.sql.SparkSession


val spark = SparkSession.builder()
.master("local[2]")
.appName("test")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/dbname")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/dbname")
.getOrCreate()

import spark.implicits._

val explodedWords1 = List(
("abc",1),
("xyz",23)
).toDF("Word", "Count")

MongoSpark.save(explodedWords1.write.option("collection", "wordcount").mode("overwrite"))

但是,如果您确实希望将结果作为单个 json 文件,那么下面的脚本应该可以做到:

explodedWords1.repartition(1).write.json("/tmp/wordcount")

最后,如果您希望 json 作为 scala 中的字符串列表,只需使用

explodedWords1.toJSON.collect()
<小时/>

更新:

我没有看到您希望将所有条记录聚合到一个字段(“单词”)

如果您使用下面的代码,则上述所有三种方法仍然有效(将 explodedWords1aggregate 交换)

import org.apache.spark.sql.functions._

val aggregated = explodedWords1.agg(
collect_list(map(lit("word"), 'Word, lit("count"), 'Count)).as("words")
)
<小时/>

选项 1:explodedWords1

explodedWords1

选项 2:聚合

aggregated

关于mongodb - 在scala中将dataframe转换为json,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49814098/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com