gpt4 book ai didi

apache-spark - 如何使用 flatMap() 在 PySpark 中并行列出 S3 对象?

转载 作者:行者123 更新时间:2023-12-04 15:53:11 30 4
gpt4 key购买 nike

我有一个数据框,其中每一行都包含一个指向 S3 中某个位置的前缀。我想使用 flatMap() 遍历每一行,在每个前缀中列出 S3 对象,并返回一个新的数据框,其中包含 S3 中列出的每个文件的一行。

我有这个代码:

import boto3
s3 = boto3.resource('s3')

def flatmap_list_s3_files(row):
bucket = s3.Bucket(row.bucket)
s3_files = []
for obj in bucket.objects.filter(Prefix=row.prefix):
s3_files.append(obj.key)

rows = []
for f in s3_files:
row_dict = row.asDict()
row_dict['s3_obj'] = f
rows.append(Row(**row_dict))
return rows

df = <code that loads the dataframe>
df.rdd.flatMap(lambda x: flatmap_list_s3_files(x))).toDF()

我猜唯一的问题是 s3 对象不是可腌制的?所以我收到了这个错误,我不确定接下来要尝试什么:

PicklingError:无法 pickle 未打开以供阅读的文件

我是一个 spark noob,所以我希望有一些其他的 API 或一些方法来并行化 S3 中的文件列表并将其与原始数据框连接在一起。明确地说,我并没有尝试读取 S3 文件本身中的任何数据,我正在构建一个表,该表本质上是 S3 中所有文件的元数据目录。任何提示将不胜感激。

最佳答案

你不能在你的 spark 集群周围发送 s3 客户端;您需要共享创建一个所需的所有信息并在远端对其进行实例化。我不知道 .py 但在 Java API 中你只需将路径作为字符串传递,然后将其转换为 Path 对象,调用 Path.getFileSystem() 并在那里工作. Spark 工作人员将缓存文件系统实例以快速重用

关于apache-spark - 如何使用 flatMap() 在 PySpark 中并行列出 S3 对象?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53092928/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com