gpt4 book ai didi

pyspark - 我如何迭代代码存储库中的 json 文件并增量附加到数据集

转载 作者:行者123 更新时间:2023-12-03 16:57:33 24 4
gpt4 key购买 nike

我已经通过数据连接将一个包含 100,000 个约 100GB 的原始 json 文件的数据集导入到代工厂中。我想用Python Transforms raw file access转换以读取文件,将结构和结构的数组展平到数据帧中作为对 df 的增量更新。
我想使用 *.json 文件文档中的以下示例中的内容,并将其转换为使用 @incremental() 更新的增量文件装饰器。

>>> import csv
>>> from pyspark.sql import Row
>>> from transforms.api import transform, Input, Output
>>>
>>> @transform(
... processed=Output('/examples/hair_eye_color_processed'),
... hair_eye_color=Input('/examples/students_hair_eye_color_csv'),
... )
... def example_computation(hair_eye_color, processed):
...
... def process_file(file_status):
... with hair_eye_color.filesystem().open(file_status.path) as f:
... r = csv.reader(f)
...
... # Construct a pyspark.Row from our header row
... header = next(r)
... MyRow = Row(*header)
...
... for row in csv.reader(f):
... yield MyRow(*row)
...
... files_df = hair_eye_color.filesystem().files('**/*.csv')
... processed_df = files_df.rdd.flatMap(process_file).toDF()
... processed.write_dataframe(processed_df)
在@Jeremy David Gamet 的帮助下,我能够开发代码来获取我想要的数据集。
from transforms.api import transform, Input, Output
from pyspark import *
import json


@transform(
out=Output('foundry/outputdataset'),
inpt=Input('foundry/inputdataset'),
)
def update_set(ctx, inpt, out):
spark = ctx.spark_session
sc = spark.sparkContext

filesystem = list(inpt.filesystem().ls())
file_dates = []
for files in filesystem:
with inpt.filesystem().open(files.path,'r', encoding='utf-8-sig') as fi:
data = json.load(fi)
file_dates.append(data)

json_object = json.dumps(file_dates)
df_2 = spark.read.option("multiline", "true").json(sc.parallelize([json_object]))

df_2.drop_duplicates()
# this code to [Flatten array column][1]
df_2 = flatten(df_2)
out.write_dataframe(df_2)

代码到 flatten__df
上面的代码适用于少数文件,因为文件大于 100,0000,我遇到以下错误:
Connection To Driver Lost 

This error indicates that connection to the driver was lost unexpectedly, which is often caused by the driver being terminated due to running out of memory. Common reasons for driver out-of-memory (OOM) errors include functions that materialize data to the driver such as .collect(), broadcasted joins, and using Pandas dataframes.

有什么办法解决这个问题吗?

最佳答案

我已经给出了一个如何动态完成此操作的示例,作为对另一个问题的回答。
这是该代码答案的链接:How to union multiple dynamic inputs in Palantir Foundry?以及相同代码的副本:

from transforms.api import Input, Output, transform
from pyspark.sql import functions as F
import json
import logging


def transform_generator():
transforms = []
transf_dict = {## enter your dynamic mappings here ##}

for value in transf_dict:
@transform(
out=Output(' path to your output here '.format(val=value)),
inpt=Input(" path to input here ".format(val=value)),
)
def update_set(ctx, inpt, out):
spark = ctx.spark_session
sc = spark.sparkContext

filesystem = list(inpt.filesystem().ls())
file_dates = []
for files in filesystem:
with inpt.filesystem().open(files.path) as fi:
data = json.load(fi)
file_dates.append(data)

logging.info('info logs:')
logging.info(file_dates)
json_object = json.dumps(file_dates)
df_2 = spark.read.option("multiline", "true").json(sc.parallelize([json_object]))
df_2 = df_2.withColumn('upload_date', F.current_date())

df_2.drop_duplicates()
out.write_dataframe(df_2)
transforms.append(update_logs)
return transforms


TRANSFORMS = transform_generator()
如果有什么我可以澄清的,请告诉我。

关于pyspark - 我如何迭代代码存储库中的 json 文件并增量附加到数据集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66815059/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com