- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我的 SQL 查询可以非常简单地写成:
result = session.query(Table).filter(Table.my_key._in(key_set))
my_key
整数列被索引(主键),但 key_set
可能确实非常大,有数千万个值。
对于如此庞大的集合进行过滤,推荐的 SQLAlchemy 模式是什么?
有没有比行人更高效的内置东西:
result = [session.query(Table).get(key) for key in key_set]
最佳答案
在这种极端情况下,您最好首先考虑推荐的 SQL 解决方案是什么,然后在 SQLAlchemy 中实现它——如果需要,甚至可以使用原始 SQL。一种这样的解决方案是为 key_set
数据创建一个临时表并填充它。
为了测试类似您的设置的东西,我创建了以下模型
class Table(Base):
__tablename__ = 'mytable'
my_key = Column(Integer, primary_key=True)
并用 20,000,000 行填充它:
In [1]: engine.execute("""
...: insert into mytable
...: select generate_series(1, 20000001)
...: """)
我还创建了一些帮助程序来测试临时表、填充和查询的不同组合。请注意,查询使用核心表,以绕过 ORM 及其机制——无论如何,对计时的贡献将是恒定的:
# testdb is just your usual SQLAlchemy imports, and some
# preconfigured engine options.
from testdb import *
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Executable, ClauseElement
from io import StringIO
from itertools import product
class Table(Base):
__tablename__ = "mytable"
my_key = Column(Integer, primary_key=True)
def with_session(f):
def wrapper(*a, **kw):
session = Session(bind=engine)
try:
return f(session, *a, **kw)
finally:
session.close()
return wrapper
def all(_, query):
return query.all()
def explain(analyze=False):
def cont(session, query):
results = session.execute(Explain(query.statement, analyze))
return [l for l, in results]
return cont
class Explain(Executable, ClauseElement):
def __init__(self, stmt, analyze=False):
self.stmt = stmt
self.analyze = analyze
@compiles(Explain)
def visit_explain(element, compiler, **kw):
stmt = "EXPLAIN "
if element.analyze:
stmt += "ANALYZE "
stmt += compiler.process(element.stmt, **kw)
return stmt
def create_tmp_tbl_w_insert(session, key_set, unique=False):
session.execute("CREATE TEMPORARY TABLE x (k INTEGER NOT NULL)")
x = table("x", column("k"))
session.execute(x.insert().values([(k,) for k in key_set]))
if unique:
session.execute("CREATE UNIQUE INDEX ON x (k)")
session.execute("ANALYZE x")
return x
def create_tmp_tbl_w_copy(session, key_set, unique=False):
session.execute("CREATE TEMPORARY TABLE x (k INTEGER NOT NULL)")
# This assumes that the string representation of the Python values
# is a valid representation for Postgresql as well. If this is not
# the case, `cur.mogrify()` should be used.
file = StringIO("".join([f"{k}\n" for k in key_set]))
# HACK ALERT, get the DB-API connection object
with session.connection().connection.connection.cursor() as cur:
cur.copy_from(file, "x")
if unique:
session.execute("CREATE UNIQUE INDEX ON x (k)")
session.execute("ANALYZE x")
return table("x", column("k"))
tmp_tbl_factories = {
"insert": create_tmp_tbl_w_insert,
"insert (uniq)": lambda session, key_set: create_tmp_tbl_w_insert(session, key_set, unique=True),
"copy": create_tmp_tbl_w_copy,
"copy (uniq)": lambda session, key_set: create_tmp_tbl_w_copy(session, key_set, unique=True),
}
query_factories = {
"in": lambda session, _, x: session.query(Table.__table__).
filter(Table.my_key.in_(x.select().as_scalar())),
"exists": lambda session, _, x: session.query(Table.__table__).
filter(exists().where(x.c.k == Table.my_key)),
"join": lambda session, _, x: session.query(Table.__table__).
join(x, x.c.k == Table.my_key)
}
tests = {
"test in": (
lambda _s, _ks: None,
lambda session, key_set, _: session.query(Table.__table__).
filter(Table.my_key.in_(key_set))
),
"test in expanding": (
lambda _s, _kw: None,
lambda session, key_set, _: session.query(Table.__table__).
filter(Table.my_key.in_(bindparam('key_set', key_set, expanding=True)))
),
**{
f"test {ql} w/ {tl}": (tf, qf)
for (tl, tf), (ql, qf)
in product(tmp_tbl_factories.items(), query_factories.items())
}
}
@with_session
def run_test(session, key_set, tmp_tbl_factory, query_factory, *, cont=all):
x = tmp_tbl_factory(session, key_set)
return cont(session, query_factory(session, key_set, x))
对于小键集,您拥有的简单 IN
查询与其他查询一样快,但是使用 100,000 的 key_set
时,涉及更多的解决方案开始获胜:
In [10]: for test, steps in tests.items():
...: print(f"{test:<28}", end=" ")
...: %timeit -r2 -n2 run_test(range(100000), *steps)
...:
test in 2.21 s ± 7.31 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test in expanding 630 ms ± 929 µs per loop (mean ± std. dev. of 2 runs, 2 loops each)
test in w/ insert 1.83 s ± 3.73 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test exists w/ insert 1.83 s ± 3.99 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test join w/ insert 1.86 s ± 3.76 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test in w/ insert (uniq) 1.87 s ± 6.67 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test exists w/ insert (uniq) 1.84 s ± 125 µs per loop (mean ± std. dev. of 2 runs, 2 loops each)
test join w/ insert (uniq) 1.85 s ± 2.8 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test in w/ copy 246 ms ± 1.18 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test exists w/ copy 243 ms ± 2.31 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test join w/ copy 258 ms ± 3.05 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test in w/ copy (uniq) 261 ms ± 1.39 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test exists w/ copy (uniq) 267 ms ± 8.24 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
test join w/ copy (uniq) 264 ms ± 1.16 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
将 key_set
提高到 1,000,000:
In [11]: for test, steps in tests.items():
...: print(f"{test:<28}", end=" ")
...: %timeit -r2 -n1 run_test(range(1000000), *steps)
...:
test in 23.8 s ± 158 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test in expanding 6.96 s ± 3.02 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test in w/ insert 19.6 s ± 79.3 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test exists w/ insert 20.1 s ± 114 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test join w/ insert 19.5 s ± 7.93 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test in w/ insert (uniq) 19.5 s ± 45.4 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test exists w/ insert (uniq) 19.6 s ± 73.6 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test join w/ insert (uniq) 20 s ± 57.5 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test in w/ copy 2.53 s ± 49.9 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test exists w/ copy 2.56 s ± 1.96 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test join w/ copy 2.61 s ± 26.8 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test in w/ copy (uniq) 2.63 s ± 3.79 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
test exists w/ copy (uniq) 2.61 s ± 916 µs per loop (mean ± std. dev. of 2 runs, 1 loop each)
test join w/ copy (uniq) 2.6 s ± 5.31 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)
10,000,000 个 key 集,仅COPY
解决方案,因为其他解决方案占用了我所有的 RAM 并且在被杀死之前正在经历交换,暗示他们永远不会在这台机器上完成:
In [12]: for test, steps in tests.items():
...: if "copy" in test:
...: print(f"{test:<28}", end=" ")
...: %timeit -r1 -n1 run_test(range(10000000), *steps)
...:
test in w/ copy 28.9 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
test exists w/ copy 29.3 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
test join w/ copy 29.7 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
test in w/ copy (uniq) 28.3 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
test exists w/ copy (uniq) 27.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
test join w/ copy (uniq) 28.4 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
因此,对于小型 key 集(~100,000 或更少),使用什么并不重要,尽管与易用性相比,使用扩展 bindparam
在时间上明显是赢家,但是对于更大的集合,您可能需要考虑使用临时表和COPY
。
值得注意的是,对于大型集合,如果使用唯一索引,查询计划是相同的:
In [13]: print(*run_test(range(10000000),
...: tmp_tbl_factories["copy (uniq)"],
...: query_factories["in"],
...: cont=explain()), sep="\n")
Merge Join (cost=45.44..760102.11 rows=9999977 width=4)
Merge Cond: (mytable.my_key = x.k)
-> Index Only Scan using mytable_pkey on mytable (cost=0.44..607856.88 rows=20000096 width=4)
-> Index Only Scan using x_k_idx on x (cost=0.43..303939.09 rows=9999977 width=4)
In [14]: print(*run_test(range(10000000),
...: tmp_tbl_factories["copy (uniq)"],
...: query_factories["exists"],
...: cont=explain()), sep="\n")
Merge Join (cost=44.29..760123.36 rows=9999977 width=4)
Merge Cond: (mytable.my_key = x.k)
-> Index Only Scan using mytable_pkey on mytable (cost=0.44..607856.88 rows=20000096 width=4)
-> Index Only Scan using x_k_idx on x (cost=0.43..303939.09 rows=9999977 width=4)
In [15]: print(*run_test(range(10000000),
...: tmp_tbl_factories["copy (uniq)"],
...: query_factories["join"],
...: cont=explain()), sep="\n")
Merge Join (cost=39.06..760113.29 rows=9999977 width=4)
Merge Cond: (mytable.my_key = x.k)
-> Index Only Scan using mytable_pkey on mytable (cost=0.44..607856.88 rows=20000096 width=4)
-> Index Only Scan using x_k_idx on x (cost=0.43..303939.09 rows=9999977 width=4)
由于测试表是人工的,因此可以使用仅索引扫描。
最后,这里是“行人”方式的计时,粗略对比一下:
In [3]: for ksl in [100000, 1000000]:
...: %time [session.query(Table).get(k) for k in range(ksl)]
...: session.rollback()
...:
CPU times: user 1min, sys: 1.76 s, total: 1min 1s
Wall time: 1min 13s
CPU times: user 9min 48s, sys: 17.3 s, total: 10min 5s
Wall time: 12min 1s
问题是使用 Query.get()
必然包括 ORM,而原来的比较没有。尽管如此,即使在使用本地数据库时,单独往返数据库的成本还是很明显的。
关于python - 非常大的集合的 SQLAlchemy 集合成员资格,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56761442/
我正在尝试创建一个使用 UUID 作为主键的用户模型: from src.db import db # SQLAlchemy instance import sqlalchemy_utils impo
在 sqlalchemy 中,我试图合并表,然后使用 WHERE 和 ORDER_BY 创建别名 有点像 SELECT * FROM ( SELECT [TABLE_ONE].[SOME_ID]
我正在使用 SQL Alchemy(通过 Flask_sqlalchemy)将 Python 字典列表插入到 Postgres 数据库中。 其中一个表是所有唯一项目的列表(表 1),而第二个是与某个项
This source详细说明如何使用关联代理创建具有 ORM 对象值的 View 和对象。 但是,当我附加一个与数据库中现有对象匹配的值(并且所述值是唯一的或主键)时,它会创建一个冲突的对象,因此我
SQLAlchemy Core和SQLAlchemy ORM的目的有什么区别? 最佳答案 顾名思义,ORM是一个对象关系映射器:其目的是将数据库关系表示为Python对象。 核心是查询构建器。其目的是
带有ForeignKey的Column是否自动创建索引? 还是我需要手动添加index=True? some_field = Column(Integer, ForeignKey(SomeModel.
我有一个主数据库,每个客户自己的数据库连接存储在其中。 因此,每个客户端都使用2个db:main和它自己的db,必须确定其连接 对于每个http调用。我如何使用flask-sqlalchemy扩展名执
当我仅对类进行继承时,它才起作用 class User(Base): __tablename__ = ’users’ id = Column(Integer, primary_key=
从用户的角度来看,SQLAlchemy 的查询日志似乎有点过于冗长,有时甚至有点神秘: 2015-10-02 13:51:39,500 INFO sqlalchemy.engine.base.Engi
我正在尝试使用 wtforms.ext.sqlalchemy QuerySelectMultipleField 显示复选框列表,但我无法在 GET 的表单上显示模型数据。 这是我的models.py
我想为查询返回一个中继连接。使用标准的 graphene-sqlalchemy 你可以这样做: class Query(graphene.ObjectType): node = relay.N
我在 centos 7.5 虚拟机上部署了最新的 Airflow ,并将 sql_alchemy_conn 和 result_backend 更新到 postgresql 实例上的 postgres
我想将多个项目插入到一个表中,并在发生冲突时更新该表。这是我想出的以下内容 from sqlalchemy.dialects.postgresql import insert meta = MetaD
我有以下模型: class Item(Base): a = relationship() b = relationship() c = relationship() d
我有 presto 和 superset 设置。 presto 运行良好,可以通过命令访问: ./app/hadoop/setjdk8.sh;bin/presto-cli --server http:
我一直在寻找一种在 sqlalchemy 中使用 tsvector 的方法(就像 INTEGER 等其他方法一样),但到目前为止我还不清楚如何做到这一点。我读过可以使用 UserDefinedType
我正在使用 sqlalchemy(现在使用 sqlite,但稍后可能会改变)来构建一个数据库,其中插入的顺序和 rowids 很重要。我基本上有以下几点: class Message(Base):
给定一个对象,我想知道如何知道它是否是 sqlalchemy 映射模型的实例。 通常,我会使用 isinstance(obj, DeclarativeBase)。但是,在这种情况下,我没有可用的 De
我已经通读了查询文档,如果有办法从查询中获取表名,就看不到任何地方 - 例如如果我有 q = query(Users) ,我可以得到Users从 q 退出? 最佳答案 请注意,像您这样的事件简单查询可
我不确定如何定义create schema foo迁移?我的模型如下所示(我正在使用Flask-Migrate): class MyTable(db.Model): __tablename__
我是一名优秀的程序员,十分优秀!