- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个 S3 存储桶,其中的对象的“上次修改时间”范围从非常旧到当前。我需要能够在窗口中找到具有上次修改标记的文件,然后将这些文件(JSON 格式)读入某种数据帧(pandas、spark 等)中。
我尝试收集文件,单独读取它们并通过以下代码附加,但速度非常慢:
session = boto3.session.Session(region_name=region)
#Gather all keys that have a modified stamp between max_previous_data_extracted_timestamp and start_time_proper
s3 = session.resource('s3', region_name=region)
bucket = s3.Bucket(args.sourceBucket)
app_body = []
for obj in bucket.objects.all():
obj_datetime = obj.last_modified.replace(tzinfo=None)
if args.accountId + '/Patient' in obj.key and obj_datetime > max_previous_data_extracted_timestamp_datetime and obj_datetime <= start_time_datetime:
obj_df = pd.read_csv(obj.get()['Body'])
app_body.append(obj_df)
merged_dataframe = pd.concat(app_body)
逻辑是有效的,因为我只获取窗口内已修改的对象,但是,获取主体并附加到列表的下一部分在约 10K 文件上运行 30-45 分钟。必须有一种我没有想到的更好的方法来做到这一点。
最佳答案
Spark 是实现这一目标的一种方式。
当与包含大量文件的 S3 存储桶交谈时,我们始终需要记住,列出存储桶中的所有对象的成本很高,因为它一次返回 1000 个对象以及用于获取下一组对象的指针。这使得并行化变得非常困难,除非您了解结构并使用它来优化这些调用。
如果代码不起作用,我很抱歉,我使用 scala,但这应该几乎处于工作状态。
知道您的结构是bucket/account_identifier/Patient/Patient_identifier
:
# account_identifiers -- provided from DB
accounts_df = sc.parallelize(account_identifiers, number_of_partitions)
paths = accounts_df.mapPartitions(fetch_files_for_account).collect()
df = spark.read.json(paths)
def fetch_files_for_account(accounts):
s3 = boto3.client('s3')
result = []
for a in accounts:
marker = ''
while True:
request_result = s3.list_objects(Bucket=args.sourceBucket, Prefix=a)
items = request_result['Contents']
for i in items:
obj_datetime = i['LastModified'].replace(tzinfo=None)
if obj_datetime > max_previous_data_extracted_timestamp_datetime and obj_datetime <= start_time_datetime:
result.append('s3://' + args.sourceBucket +'/' + i['Key'])
if not request_result['IsTruncated']:
break
else:
marker = request_result['Marker']
return iter(result)
映射分区将确保您没有实例化太多客户端。您可以使用 number_of_partitions
控制该数量。
您可以做的另一个优化是在调用 mapPartitions
后手动加载内容,而不是使用 collect()
。在该阶段之后,您将获得 JSON 内容的 String
,然后调用 spark.createDataFrame(records, schema)
。注意:您必须提供架构。
如果您没有 account_identifiers
或文件数量不会达到 100k 范围,您将必须列出存储桶中的所有对象,按 last_modified
进行过滤,基本上执行相同的调用:
spark.read.json(paths)
关于python - 将符合上次修改窗口的 S3 文件读入 DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60310736/
假设您有 2 个文件,如下所示。 file_1_october.csv file_2_november.csv 文件具有相同的列。所以我想在 R 中读取这两个文件,我可以使用 map 轻松完成。我还想
我有一个制表符分隔的文本文件: 0730000 John 1 01 225 000 000 当我将它读入 R 时 stud_stats data.table::f
似乎最直观的是 .rdata 文件可能是 R 加载的快速文件格式,但是在扫描一些堆栈帖子时,似乎更多的注意力集中在提高 .csv 或其他格式的加载时间上。有确定的答案吗? 最佳答案 不是一个明确的答案
我是 R 的新手,目前在读取 .csv 文件并将其转换为 data.frame 时遇到了很多麻烦7 列。这是我正在做的: gene_symbols_table head(gene_symbols_t
基本上我有一个格式如下所示的 csv: csv 有 11 列,前五列和后五列完全相同。我希望能够读取 csv 并将第一列和第五列(期间和支出)的所有实例存储在一个列表中,它们具有值,并对另一个列表中的
我对 Julia 比较陌生,正在寻找一种有效的方法来从文本文件中读取并将每个“列”存储在数组中(我有 2 列,但通用解决方案也很棒)。例如,我想要输入 1 2 3 4 5 6
基本上我有一个格式如下所示的 csv: csv 有 11 列,前五列和后五列完全相同。我希望能够读取 csv 并将第一列和第五列(期间和支出)的所有实例存储在一个列表中,它们具有值,并对另一个列表中的
我的程序分配了一个 32 位 int,随后尝试使用 read(2) 从套接字将 4 个字节读入 int 有时读取不完整并返回读取 2 个字节。有什么方法可以从中恢复吗?我想我必须在 int 的中途生成
我有大量的 CSV 文件。有些标题从第一行开始,其他标题从第 3 行开始,其他的从第 7 行开始,依此类推。 标题看起来都一样,它们只是从不同文件的不同行开始。有没有办法有条件地 read.csv 文
我写了一个小程序来从 csv 文件中读取数据: using System; using System.Collections.Generic; using System.Linq; using Sys
我需要读入一个包含 10,000 个整数的列表,并将它们按升序放置在一个 vector 中。请注意,我不是在然后阅读排序,而是在同时阅读时排序。 我这样做是为了学习。我意识到阅读时排序是 O(n^2)
我有一个问题。不幸的是,我没有找到任何答案。如何将参数传递给脚本,这是另一个命令的结果。例如: ls | ./myscript.sh 我想将 ls 的结果传递给 myscript。如果我执行上面的命
我在读取扩展 ASCII 字符并将其转换为十进制值时遇到问题。我试过这样做: unsigned char temp; while(temp = cin.get != EOF) { cout << (i
我已经通过以下命令加载了文本文件。我想从 contents 中删除由 \n 分隔的第一行标题行。怎么做? txtfile = open(filepath, "rt") contents = txtfi
希望一切顺利...我正在将数据集输入到 sklearn 算法中进行分类,但找不到任何简单的数据集来开始,所以我自己制作了数据集。但有一个问题... import numpy as np import
我有一个 .csv 文件,它有 3 行和 5 列,值为 0、1、2、3、50 或 100。我将它从 Excel 工作表保存到 .csv 文件。我正在尝试使用 C++ 读取 .csv 文件,并根据最后三
我有一个 HTML 文件,它将作为我要发送的电子邮件的模板。 html 中有一些字段是可变的。我想知道是否有一种可靠的方法可以用变量替换 HTML 文件中的占位符。我知道我可以 string.Repl
我从未使用过 JSON 文件,但我有实现 JSON 文件的任务,我需要将其转换为 IEnumerable。当我尝试对 JSON 对象进行反序列化时,我得到一个异常,上面写着: An unhandled
我正在尝试阅读 IFormFile从这样的 HTTP POST 请求中收到: public async Task UploadDocument([FromForm]DataWrapper data)
我有一个包含大量多行文本 block 的文件。我想将该文件读入一个字符向量列表——每个 block 一个。我对 scan()、read.table() 等函数的文档的阅读似乎表明一行的结尾将结束向量。
我是一名优秀的程序员,十分优秀!