gpt4 book ai didi

PySpark 并行读取多个文件

转载 作者:行者123 更新时间:2023-12-02 01:58:29 27 4
gpt4 key购买 nike

我的项目中有以下需求,我们正在尝试使用 PySpark 进行数据处理。

我们过去常常以每辆车的 Parquet 文件形式接收传感器数据,并且每辆车都有一个文件。该文件有很多传感器,但其结构化数据为 Parquet 格式。每个文件的平均文件大小为 200MB。

假设我批量收到如下文件并准备进行处理。

训练文件大小日期

X1 210MB 2018 年 9 月 5 日上午 12:10

X1 280MB 2018 年 9 月 5 日下午 5:10

Y1 220MB 2018 年 9 月 5 日上午 04:10

Y1 241MB 2018 年 9 月 5 日下午 06:10

在处理结束时,我需要从每个源文件接收一个聚合的 .csv 文件,或者接收一个包含所有这些车辆聚合数据的主文件。

我知道HDFS默认 block 大小是128MB,每个文件将被分成2个 block 。我可以知道如何使用 PySpark 来完成此要求吗?是否可以并行处理所有这些文件?

请告诉我你的想法

最佳答案

我也遇到了类似的问题,看来我找到了办法:1. 获取文件列表2.并行化这个列表(分布在所有节点之间)3.编写一个函数,从分发到节点的大列表部分中读取所有文件的内容4.使用mapPartition运行它,然后将结果收集为一个列表,每个元素是每个文件的收集内容。存储在 AWS s3 上的 Fot 文件和 json 文件:

def read_files_from_list(file_list):
#reads files from list
#returns content as list of strings, 1 json per string ['{}','{}',...]
out=[]
for x in file_list:
content = sp.check_output([ 'aws', 's3', 'cp', x, '-']) # content of the file. x here is a full path: 's3://bucket/folder/1.json'
out.append(content)
return out #content of all files from the file_list as list of strings, 1 json per string ['{}','{}',...]


file_list=['f1.json','f2.json',...]
ps3="s3://bucket/folder/"
full_path_chunk=[ps3 + f for f in file_list] #makes list of strings, with full path for each file
n_parts = 100
rdd1 = sc.parallelize(full_path_chunk, n_parts ) #distribute files among nodes
list_of_json_strings = rdd1.mapPartitions(read_files_from_list).collect()

然后,如果需要,您可以像这样创建 Spark 数据框:

rdd2=sc.parallelize(list_of_json_strings) #this is a trick! via http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets
df_spark=sqlContext.read.json(rdd2)

函数read_files_from_list只是一个示例,应该将其更改为使用python工具从hdfs读取文件。希望这有帮助:)

关于PySpark 并行读取多个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52179518/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com