gpt4 book ai didi

database - 内存高效的大型数据集流式传输到 S3

转载 作者:搜寻专家 更新时间:2023-10-30 20:24:03 25 4
gpt4 key购买 nike

我正在尝试使用 SQL Alchemy 复制 S3 大型数据集(大于 RAM)。我的约束是:

  1. 我需要使用 sqlalchemy
  2. 我需要将内存压力保持在最低水平
  3. 我不想使用本地文件系统作为向 s3 发送数据的中间步骤

我只想以一种内存高效的方式将数据从数据库传输到 S3

我可以正常处理数据集(使用以下逻辑),但对于更大的数据集,我遇到了缓冲区问题。

我解决的第一个问题是执行查询通常会将结果缓冲到内存中。我使用 fetchmany() 方法。

engine = sqlalchemy.create_engine(db_url)
engine.execution_options(stream_results=True)

results=engine.execute('SELECT * FROM tableX;')
while True:
chunk = result.fetchmany(10000)
if not chunk:
break

另一方面,我有一个 StringIO 缓冲区,我将它与 fetchmany 数据检查一起提供。然后我将它的内容发送到 s3。

from io import StringIO
import boto3
import csv

s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')
csv_writer.writerows(chunk)
s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())

我遇到的问题本质上是一个设计问题,我如何让这些部分协同工作。在同一运行时甚至可能吗?

engine = sqlalchemy.create_engine(db_url)
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')

engine.execution_options(stream_results=True)
results=engine.execute('SELECT * FROM tableX;')
while True:
chunk = result.fetchmany(10000)
csv_writer = csv.writer(csv_buffer, delimiter=';')
csv_writer.writerows(chunk)
s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())
if not chunk:
break

我可以让它在 fetchmany 的一个周期内工作,但不能在多个周期内工作。有什么想法吗?

最佳答案

我假设“让这些部分协同工作”是指您想要 S3 中的单个文件,而不只是部分?您需要做的就是创建一个文件对象,该文件对象在读取时将为下一批发出查询并对其进行缓冲。我们可以利用 python 的生成器:

def _generate_chunks(engine):
with engine.begin() as conn:
conn = conn.execution_options(stream_results=True)
results = conn.execute("")
while True:
chunk = results.fetchmany(10000)
if not chunk:
break
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')
csv_writer.writerows(chunk)
yield csv_buffer.getvalue().encode("utf-8")

这是文件 block 的流,所以我们需要做的就是将它们拼接在一起(当然是懒惰地)到一个文件对象中:

class CombinedFile(io.RawIOBase):
def __init__(self, strings):
self._buffer = ""
self._strings = iter(strings)

def read(self, size=-1):
if size < 0:
return self.readall()
if not self._buffer:
try:
self._buffer = next(self._strings)
except StopIteration:
pass
if len(self._buffer) > size:
ret, self._buffer = self._buffer[:size], self._buffer[size:]
else:
ret, self._buffer = self._buffer, b""
return ret

chunks = _generate_chunks(engine)
file = CombinedFile(chunks)
upload_file_object_to_s3(file)

将文件对象流式传输到 S3 留给读者作为练习。 (您可能可以使用 put_object 。)

关于database - 内存高效的大型数据集流式传输到 S3,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47596051/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com