gpt4 book ai didi

python - 将 Spark 数据帧写为 json 数组(pyspark)

转载 作者:行者123 更新时间:2023-12-04 21:01:48 24 4
gpt4 key购买 nike

我想将我的 spark 数据帧编写为一组 JSON 文件,特别是每个文件作为一个 JSON 数组。
让我用一个简单的(可重现的)代码来解释。

我们有:

import numpy as np
import pandas as pd
df = spark.createDataFrame(pd.DataFrame({'x': np.random.rand(100), 'y': np.random.rand(100)}))

将数据帧保存为:
df.write.json('s3://path/to/json')

刚创建的每个文件每行都有一个 JSON 对象,例如:
{"x":0.9953802385540144,"y":0.476027611419198}
{"x":0.929599290575914,"y":0.72878523939521}
{"x":0.951701684432855,"y":0.8008064729546504}

但我想要一个包含这些 JSON 的数组 每个文件 :
[
{"x":0.9953802385540144,"y":0.476027611419198},
{"x":0.929599290575914,"y":0.72878523939521},
{"x":0.951701684432855,"y":0.8008064729546504}
]

最佳答案

目前不可能让 spark“本地”以您想要的格式写入单个文件,因为 spark 以分布式(并行)方式工作,每个执行器独立写入其部分数据。

但是,既然您是 okay with having each file be an array of json not only [one] file ,这是您可以用来实现所需输出的一种解决方法:

from pyspark.sql.functions import to_json, spark_partition_id, collect_list, col, struct

df.select(to_json(struct(*df.columns)).alias("json"))\
.groupBy(spark_partition_id())\
.agg(collect_list("json").alias("json_list"))\
.select(col("json_list").cast("string"))\
.write.text("s3://path/to/json")

首先你创建一个 json来自 df 中的所有列.然后按 spark 分区 ID 分组并使用 collect_list 进行聚合。 .这将把所有 json将该分区上的 s 放入一个列表中。由于您在分区内聚合,因此不需要对数据进行混洗。

现在选择列表列,转换为字符串,并将其写入文本文件。

这是一个文件外观的示例:
[{"x":0.1420523746714616,"y":0.30876114874052263}, ... ]

请注意,您可能会得到一些空文件。

如果您指定了一个空 groupBy,大概您可以强制 spark 将数据写入一个文件中。 ,但这会导致将所有数据强制放入单个分区,从而导致内存不足错误。

关于python - 将 Spark 数据帧写为 json 数组(pyspark),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58238563/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com