- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我有一个 Flask应用 RESTful应用程序接口(interface)。其中一个 API 调用是带有 JSON 负载的“批量更新插入”调用。我正在为性能而苦苦挣扎。
我尝试的第一件事是使用 merge-result
在 Query
对象上,因为...
This is an optimized method which will merge all mapped instances, preserving the structure of the result rows and unmapped columns with less method overhead than that of calling Session.merge() explicitly for each value.
这是初始代码:
class AdminApiUpdateTasks(Resource):
"""Bulk task creation / update endpoint"""
def put(self, slug):
taskdata = json.loads(request.data)
existing = db.session.query(Task).filter_by(challenge_slug=slug)
existing.merge_result(
[task_from_json(slug, **task) for task in taskdata])
db.session.commit()
return {}, 200
向该端点发出约 5000 条记录的请求,所有这些记录都已存在于数据库中,需要超过 11m 才能返回:
real 11m36.459s
user 0m3.660s
sys 0m0.391s
因为这是一个相当典型的用例,所以我开始寻找提高性能的替代方案。与我更好的判断相反,我试图merge
每条记录的 session :
class AdminApiUpdateTasks(Resource):
"""Bulk task creation / update endpoint"""
def put(self, slug):
# Get the posted data
taskdata = json.loads(request.data)
for task in taskdata:
db.session.merge(task_from_json(slug, **task))
db.session.commit()
return {}, 200
令我惊讶的是,结果速度是原来的两倍多:
real 4m33.945s
user 0m3.608s
sys 0m0.258s
我有两个问题:
merge
的第二个策略比使用 merge_result
的假设优化的第一个策略更快?最佳答案
这是一个老问题,但我希望这个答案仍然可以帮助人们。
我使用了与this相同的想法SQLAlchemy 设置的示例,但我添加了执行 UPSERT(如果存在则插入,否则更新现有记录)操作的基准测试。我在下面的 PostgreSQL 11 数据库中添加了结果:
Tests to run: test_customer_individual_orm_select, test_customer_batched_orm_select, test_customer_batched_orm_select_add_all, test_customer_batched_orm_merge_result
test_customer_individual_orm_select : UPSERT statements via individual checks on whether objects exist and add new objects individually (10000 iterations); total time 9.359603 sec
test_customer_batched_orm_select : UPSERT statements via batched checks on whether objects exist and add new objects individually (10000 iterations); total time 1.553555 sec
test_customer_batched_orm_select_add_all : UPSERT statements via batched checks on whether objects exist and add new objects in bulk (10000 iterations); total time 1.358680 sec
test_customer_batched_orm_merge_result : UPSERT statements using batched merge_results (10000 iterations); total time 7.191284 sec
如您所见,merge-result 远非最有效的选项。我建议分批检查结果是否存在以及是否应该更新。希望这对您有所帮助!
"""
This series of tests illustrates different ways to UPSERT
or INSERT ON CONFLICT UPDATE a large number of rows in bulk.
"""
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
from profiler import Profiler
Base = declarative_base()
engine = None
class Customer(Base):
__tablename__ = "customer"
id = Column(Integer, primary_key=True)
name = Column(String(255))
description = Column(String(255))
Profiler.init("bulk_upserts", num=100000)
@Profiler.setup
def setup_database(dburl, echo, num):
global engine
engine = create_engine(dburl, echo=echo)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
s = Session(engine)
for chunk in range(0, num, 10000):
# Insert half of the customers we want to merge
s.bulk_insert_mappings(
Customer,
[
{
"id": i,
"name": "customer name %d" % i,
"description": "customer description %d" % i,
}
for i in range(chunk, chunk + 10000, 2)
],
)
s.commit()
@Profiler.profile
def test_customer_individual_orm_select(n):
"""
UPSERT statements via individual checks on whether objects exist
and add new objects individually
"""
session = Session(bind=engine)
for i in range(0, n):
customer = session.query(Customer).get(i)
if customer:
customer.description += "updated"
else:
session.add(Customer(
id=i,
name=f"customer name {i}",
description=f"customer description {i} new"
))
session.flush()
session.commit()
@Profiler.profile
def test_customer_batched_orm_select(n):
"""
UPSERT statements via batched checks on whether objects exist
and add new objects individually
"""
session = Session(bind=engine)
for chunk in range(0, n, 1000):
customers = {
c.id: c for c in
session.query(Customer)\
.filter(Customer.id.between(chunk, chunk + 1000))
}
for i in range(chunk, chunk + 1000):
if i in customers:
customers[i].description += "updated"
else:
session.add(Customer(
id=i,
name=f"customer name {i}",
description=f"customer description {i} new"
))
session.flush()
session.commit()
@Profiler.profile
def test_customer_batched_orm_select_add_all(n):
"""
UPSERT statements via batched checks on whether objects exist
and add new objects in bulk
"""
session = Session(bind=engine)
for chunk in range(0, n, 1000):
customers = {
c.id: c for c in
session.query(Customer)\
.filter(Customer.id.between(chunk, chunk + 1000))
}
to_add = []
for i in range(chunk, chunk + 1000):
if i in customers:
customers[i].description += "updated"
else:
to_add.append({
"id": i,
"name": "customer name %d" % i,
"description": "customer description %d new" % i,
})
if to_add:
session.bulk_insert_mappings(
Customer,
to_add
)
to_add = []
session.flush()
session.commit()
@Profiler.profile
def test_customer_batched_orm_merge_result(n):
"UPSERT statements using batched merge_results"
session = Session(bind=engine)
for chunk in range(0, n, 1000):
customers = session.query(Customer)\
.filter(Customer.id.between(chunk, chunk + 1000))
customers.merge_result(
Customer(
id=i,
name=f"customer name {i}",
description=f"customer description {i} new"
) for i in range(chunk, chunk + 1000)
)
session.flush()
session.commit()
关于python - 为 SQLAlchemy 批量更新插入寻找更好的策略,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23877702/
题: 是否有一种简单的方法可以获取正在运行的应用程序中泄漏的资源类型列表? IOW 通过连接到应用程序? 我知道 memproof 可以做到,但它会减慢速度,以至于应用程序甚至无法持续一分钟。大多数任
正确地说下面的代码会将自定义日志发送到.net核心中的Docker容器的stdout和stderr吗? console.Writeline(...) console.error(..) 最佳答案 如果
我想将一个任务多次重复,放入 for 循环中。我必须将时间序列对象存储为 IExchangeItem , openDA 中的一个特殊类(数据同化软件)。 这是任务之一(有效): HashMap ite
我需要从文件中读取一个数组。该数组在文件中不是连续排序的,必须跳转“偏移”字节才能获得下一个元素。假设我读取一个非常大的文件,什么更有效率。 1) 使用增量相对位置。 2)使用绝对位置。 选项 1:
我有一个安装程序(使用 Advanced Installer 制作)。我有一个必须与之交互的应用程序,但我不知道如何找到该安装的 MSIHANDLE。我查看了 Microsoft 引用资料,但没有发现
我在替换正则表达式中的“joe.”等内容时遇到问题。这是代码 var objects = new Array("joe","sam"); code = "joe.id was here so was
我有 A 类。A 类负责管理 B 对象的生命周期,它包含 B 对象的容器,即 map。 ,每个 B 对象都包含 C 对象的容器,即 map .我有一个全局 A 对象用于整个应用程序。 我有以下问题:我
任何人都可以告诉我在哪里可以找到 freeImage.so 吗?我一直在努力寻找相同的东西但没有成功..任何帮助将不胜感激。我已经尝试将 freeimage.a 转换为 freeImage .so 并
在单元测试期间,我想将生成的 URL 与测试中定义的静态 URL 进行比较。对于此比较,最好有一个 TestCase.assertURLEqual 或类似的,它可以让您比较两个字符串格式的 URL,如
'find ./ -name *.jpg' 我正在尝试优化上述语句的“查找”命令。 在查找实现中处理“-name”谓词的方法。 static boolean pred__name __common (
请原谅我在这里的困惑,但我已经阅读了关于 python 中的 seek() 函数的文档(在不得不使用它之后),虽然它帮助了我,但我仍然对它的实际含义有点困惑,任何非常感谢您的解释,谢谢。 最佳答案 关
我在我正在使用的库中找到了这个语句。它应该检查集群中的当前节点是否是领导者。这是语句:(!(cluster.Leader?.IsRemote ?? true)) 为什么不直接使用 (cluster.L
我发现 JsonParser 在 javax.json.stream 中,但我不知道在哪里可以找到它。谁能帮帮我? https://docs.oracle.com/javaee/7/api/javax
关闭。这个问题需要更多focused .它目前不接受答案。 想改善这个问题吗?更新问题,使其仅关注一个问题 editing this post . 6年前关闭。 Improve this questi
如果 git 存储库中有新的更改可用,我有一个多分支管道作业设置为每分钟由 Jenkinsfile 构建。如果分支名称是某种格式,我有一个将工件部署到环境的步骤。我希望能够在每个分支的基础上配置环境,
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 6年前关闭。 Improve thi
我想我刚刚意识到当他们不让我使用 cfdump 时我的网络主机是多么的限制。这其实有点让我生气,真的,dump 有什么害处?无论如何,我的问题是是否有人编写了一个 cfdump 替代方案来剔除复杂类型
任务:我有多个资源需要在一个 HTTP 调用中更新。 要更新的资源类型、字段和值对于所有资源都是相同的。 示例:通过 ID 设置了一组汽车,需要将所有汽车的“状态”更新为“已售出”。 经典 RESTF
场景:表中有 2 列,数据如下例所示。对于“a”列的相同值,该表可能有多个行。 在示例中,考虑到“a”列,“1”有三行,“2”有一行。 示例表“t1”: |a|b ||1|1.1||1|1.2||1
我有一个数据框: Date Price 2021-01-01 29344.67 2021-01-02 32072.08 2021-01-03 33048.03 2021-01-04 32084.
我是一名优秀的程序员,十分优秀!