gpt4 book ai didi

python - 为 SQLAlchemy 批量更新插入寻找更好的策略

转载 作者:太空狗 更新时间:2023-10-29 17:51:39 25 4
gpt4 key购买 nike

我有一个 Flask应用 RESTful应用程序接口(interface)。其中一个 API 调用是带有 JSON 负载的“批量更新插入”调用。我正在为性能而苦苦挣扎。

我尝试的第一件事是使用 merge-resultQuery 对象上,因为...

This is an optimized method which will merge all mapped instances, preserving the structure of the result rows and unmapped columns with less method overhead than that of calling Session.merge() explicitly for each value.

这是初始代码:

class AdminApiUpdateTasks(Resource):

"""Bulk task creation / update endpoint"""

def put(self, slug):
taskdata = json.loads(request.data)
existing = db.session.query(Task).filter_by(challenge_slug=slug)
existing.merge_result(
[task_from_json(slug, **task) for task in taskdata])
db.session.commit()
return {}, 200

向该端点发出约 5000 条记录的请求,所有这些记录都已存在于数据库中,需要超过 11m 才能返回:

real    11m36.459s
user 0m3.660s
sys 0m0.391s

因为这是一个相当典型的用例,所以我开始寻找提高性能的替代方案。与我更好的判断相反,我试图merge每条记录的 session :

class AdminApiUpdateTasks(Resource):

"""Bulk task creation / update endpoint"""

def put(self, slug):
# Get the posted data
taskdata = json.loads(request.data)
for task in taskdata:
db.session.merge(task_from_json(slug, **task))
db.session.commit()
return {}, 200

令我惊讶的是,结果速度是原来的两倍多:

real    4m33.945s
user 0m3.608s
sys 0m0.258s

我有两个问题:

  1. 为什么使用 merge 的第二个策略比使用 merge_result 的假设优化的第一个策略更快?
  2. 如果有的话,我还应该采取哪些其他策略来进一步优化它?

最佳答案

这是一个老问题,但我希望这个答案仍然可以帮助人们。

我使用了与this相同的想法SQLAlchemy 设置的示例,但我添加了执行 UPSERT(如果存在则插入,否则更新现有记录)操作的基准测试。我在下面的 PostgreSQL 11 数据库中添加了结果:

Tests to run: test_customer_individual_orm_select, test_customer_batched_orm_select, test_customer_batched_orm_select_add_all, test_customer_batched_orm_merge_result
test_customer_individual_orm_select : UPSERT statements via individual checks on whether objects exist and add new objects individually (10000 iterations); total time 9.359603 sec
test_customer_batched_orm_select : UPSERT statements via batched checks on whether objects exist and add new objects individually (10000 iterations); total time 1.553555 sec
test_customer_batched_orm_select_add_all : UPSERT statements via batched checks on whether objects exist and add new objects in bulk (10000 iterations); total time 1.358680 sec
test_customer_batched_orm_merge_result : UPSERT statements using batched merge_results (10000 iterations); total time 7.191284 sec

如您所见,merge-result 远非最有效的选项。我建议分批检查结果是否存在以及是否应该更新。希望这对您有所帮助!

"""
This series of tests illustrates different ways to UPSERT
or INSERT ON CONFLICT UPDATE a large number of rows in bulk.
"""
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
from profiler import Profiler


Base = declarative_base()
engine = None


class Customer(Base):
__tablename__ = "customer"
id = Column(Integer, primary_key=True)
name = Column(String(255))
description = Column(String(255))


Profiler.init("bulk_upserts", num=100000)


@Profiler.setup
def setup_database(dburl, echo, num):
global engine
engine = create_engine(dburl, echo=echo)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)

s = Session(engine)
for chunk in range(0, num, 10000):
# Insert half of the customers we want to merge
s.bulk_insert_mappings(
Customer,
[
{
"id": i,
"name": "customer name %d" % i,
"description": "customer description %d" % i,
}
for i in range(chunk, chunk + 10000, 2)
],
)
s.commit()


@Profiler.profile
def test_customer_individual_orm_select(n):
"""
UPSERT statements via individual checks on whether objects exist
and add new objects individually
"""
session = Session(bind=engine)
for i in range(0, n):
customer = session.query(Customer).get(i)
if customer:
customer.description += "updated"
else:
session.add(Customer(
id=i,
name=f"customer name {i}",
description=f"customer description {i} new"
))
session.flush()
session.commit()

@Profiler.profile
def test_customer_batched_orm_select(n):
"""
UPSERT statements via batched checks on whether objects exist
and add new objects individually
"""
session = Session(bind=engine)
for chunk in range(0, n, 1000):
customers = {
c.id: c for c in
session.query(Customer)\
.filter(Customer.id.between(chunk, chunk + 1000))
}
for i in range(chunk, chunk + 1000):
if i in customers:
customers[i].description += "updated"
else:
session.add(Customer(
id=i,
name=f"customer name {i}",
description=f"customer description {i} new"
))
session.flush()
session.commit()

@Profiler.profile
def test_customer_batched_orm_select_add_all(n):
"""
UPSERT statements via batched checks on whether objects exist
and add new objects in bulk
"""
session = Session(bind=engine)
for chunk in range(0, n, 1000):
customers = {
c.id: c for c in
session.query(Customer)\
.filter(Customer.id.between(chunk, chunk + 1000))
}
to_add = []
for i in range(chunk, chunk + 1000):
if i in customers:
customers[i].description += "updated"
else:
to_add.append({
"id": i,
"name": "customer name %d" % i,
"description": "customer description %d new" % i,
})
if to_add:
session.bulk_insert_mappings(
Customer,
to_add
)
to_add = []
session.flush()
session.commit()

@Profiler.profile
def test_customer_batched_orm_merge_result(n):
"UPSERT statements using batched merge_results"
session = Session(bind=engine)
for chunk in range(0, n, 1000):
customers = session.query(Customer)\
.filter(Customer.id.between(chunk, chunk + 1000))
customers.merge_result(
Customer(
id=i,
name=f"customer name {i}",
description=f"customer description {i} new"
) for i in range(chunk, chunk + 1000)
)
session.flush()
session.commit()

关于python - 为 SQLAlchemy 批量更新插入寻找更好的策略,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23877702/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com