gpt4 book ai didi

python - 将符合上次修改窗口的 S3 文件读入 DataFrame

转载 作者:行者123 更新时间:2023-12-01 06:22:10 24 4
gpt4 key购买 nike

我有一个 S3 存储桶,其中的对象的“上次修改时间”范围从非常旧到当前。我需要能够在窗口中找到具有上次修改标记的文件,然后将这些文件(JSON 格式)读入某种数据帧(pandas、spark 等)中。

我尝试收集文件,单独读取它们并通过以下代码附加,但速度非常慢:

session = boto3.session.Session(region_name=region)

#Gather all keys that have a modified stamp between max_previous_data_extracted_timestamp and start_time_proper
s3 = session.resource('s3', region_name=region)
bucket = s3.Bucket(args.sourceBucket)
app_body = []
for obj in bucket.objects.all():
obj_datetime = obj.last_modified.replace(tzinfo=None)
if args.accountId + '/Patient' in obj.key and obj_datetime > max_previous_data_extracted_timestamp_datetime and obj_datetime <= start_time_datetime:
obj_df = pd.read_csv(obj.get()['Body'])
app_body.append(obj_df)

merged_dataframe = pd.concat(app_body)

逻辑是有效的,因为我只获取窗口内已修改的对象,但是,获取主体并附加到列表的下一部分在约 10K 文件上运行 30-45 分钟。必须有一种我没有想到的更好的方法来做到这一点。

最佳答案

Spark 是实现这一目标的一种方式。

当与包含大量文件的 S3 存储桶交谈时,我们始终需要记住,列出存储桶中的所有对象的成本很高,因为它一次返回 1000 个对象以及用于获取下一组对象的指针。这使得并行化变得非常困难,除非您了解结构并使用它来优化这些调用。

如果代码不起作用,我很抱歉,我使用 scala,但这应该几乎处于工作状态。

知道您的结构是bucket/account_identifier/Patient/Patient_identifier:

# account_identifiers -- provided from DB
accounts_df = sc.parallelize(account_identifiers, number_of_partitions)
paths = accounts_df.mapPartitions(fetch_files_for_account).collect()
df = spark.read.json(paths)


def fetch_files_for_account(accounts):
s3 = boto3.client('s3')
result = []
for a in accounts:
marker = ''
while True:
request_result = s3.list_objects(Bucket=args.sourceBucket, Prefix=a)
items = request_result['Contents']
for i in items:
obj_datetime = i['LastModified'].replace(tzinfo=None)
if obj_datetime > max_previous_data_extracted_timestamp_datetime and obj_datetime <= start_time_datetime:
result.append('s3://' + args.sourceBucket +'/' + i['Key'])
if not request_result['IsTruncated']:
break
else:
marker = request_result['Marker']
return iter(result)

映射分区将确保您没有实例化太多客户端。您可以使用 number_of_partitions 控制该数量。

您可以做的另一个优化是在调用 mapPartitions 后手动加载内容,而不是使用 collect()。在该阶段之后,您将获得 JSON 内容的 String,然后调用 spark.createDataFrame(records, schema)。注意:您必须提供架构。

如果您没有 account_identifiers 或文件数量不会达到 100k 范围,您将必须列出存储桶中的所有对象,按 last_modified 进行过滤,基本上执行相同的调用:

spark.read.json(paths)

关于python - 将符合上次修改窗口的 S3 文件读入 DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60310736/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com