gpt4 book ai didi

python - sqlalchemy 和 postgresql 中的大表

转载 作者:行者123 更新时间:2023-11-29 12:25:18 24 4
gpt4 key购买 nike

我在 postgresql 数据库中有一个大约 900,000 行的表。在转换每一行并将数据添加到新列之后,我想将它逐行复制到另一个带有一些额外列的表中。问题是 RAM 变满了。

这是代码的相关部分:

engine = sqlalchemy.create_engine(URL(**REMOTE), echo=False)
Session = sessionmaker(bind=engine)
session = Session()
n=1000
counter=1
for i in range(1,total+1,n):
ids=str([j for j in range(i,i+n)])
**q="SELECT * from table_parts where id in (ids)"%ids**
r=session.execute(q).fetchall()
for element in r:
data={}
....
[taking data from each row, extracting string,calculation,
and filling extra columns that the new table has]
...
query=query.bindparams(**data)
try:
session.execute(query)
except:
session.rollback()
raise
if counter%n==0:
print COMMITING....",counter,datetime.datetime.now("%H:%M:%S")
session.commit()
counter+=1

查询是正确的,所以那里没有错误。在我按下 Ctrl+C 之前,新表已正确更新。

问题似乎是查询:“SELECT * from table_parts where id in (1,2,3,4...1000)”我已经尝试过使用 postgresql 数组。

我已经尝试过的事情:

  • 结果 =(连接
    .execution_options(stream_results=True) # 添加了这一行
    .execute(查询))
    from here .据我所知,这在与 postgresql 一起使用时使用服务器端游标。我放弃了我发布的代码中的 session 对象并使用了 engine.connect()

    • 在每次迭代时创建一个新连接对象,令人惊讶的是这也不起作用。 RAM 已满

来自文档,

Note that the stream_results execution option is enabled automatically if the yield_per() method is used.

所以query api中的yield_per和上面提到的stream_result选项是一样的

谢谢

最佳答案

create table table_parts ( id serial primary key, data text );
-- Insert 1M rows of about 32kB data =~ 32GB of data
-- Needs only 0.4GB of disk space because of builtin compression
-- Might take a few minutes
insert into table_parts(data)
select rpad('',32*1024,'A') from generate_series(1,1000000);

以下使用 SQLAlchemy.Core 的代码不会占用大量内存:

import sqlalchemy
import datetime
import getpass

metadata = sqlalchemy.MetaData()
table_parts = sqlalchemy.Table('table_parts', metadata,
sqlalchemy.Column('id', sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column('data', sqlalchemy.String)
)

engine = sqlalchemy.create_engine(
'postgresql:///'+getpass.getuser(),
echo=False
)
connection = engine.connect()

n = 1000

select_table_parts_n = sqlalchemy.sql.select([table_parts]).\
where(table_parts.c.id>sqlalchemy.bindparam('last_id')).\
order_by(table_parts.c.id).\
limit(n)

update_table_parts = table_parts.update().\
where(table_parts.c.id == sqlalchemy.bindparam('table_part_id')).\
values(data=sqlalchemy.bindparam('table_part_data'))

last_id=0
while True:
with connection.begin() as transaction:
row = None
for row in connection.execute(select_table_parts_n, last_id=last_id):
data = row.data.replace('A','B')
connection.execute(
update_table_parts,
table_part_id=row.id,
table_part_data=data
)
if not row:
break
else:
print "COMMITING {} {:%H:%M:%S}".\
format(row.id,datetime.datetime.now())
transaction.commit()
last_id=row.id

您似乎没有使用 ORM 功能,所以我想您也应该使用 SQLAlchemy.Core。

关于python - sqlalchemy 和 postgresql 中的大表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42764206/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com