gpt4 book ai didi

python - SQLAlchemy 奇怪的线程行为

转载 作者:行者123 更新时间:2023-12-01 09:24:08 25 4
gpt4 key购买 nike

考虑使用 Python 3.6.5 和 SQLAlchemy 1.2.7 的以下示例代码

import threading
from concurrent.futures import ThreadPoolExecutor

from sqlalchemy import create_engine, Column, Integer, Boolean
from sqlalchemy.exc import OperationalError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session, Session

engine = create_engine("sqlite:///threading_sqlalchemy.db")
base = declarative_base(engine)
smaker = sessionmaker(engine)
scopedmaker: scoped_session = scoped_session(smaker)

dblock = threading.Lock()


class Key(base):
__tablename__ = "Key"
id = Column(Integer, primary_key=True)
value = Column(Integer, nullable=False, unique=True, index=True)
taken = Column(Boolean, nullable=False, default=False)

def __repr__(self):
return f"<Key id={self.id}, value={self.value}, taken={self.taken}>"


try:
Key.__table__.drop()
# this is also quite funny, if the table doesn't exist it throws:
# sqlite3.OperationalError: no such table: Key
# when there is literally a sqlalchemy.exc.NoSuchTableError
except OperationalError:
pass
base.metadata.create_all()


def gen_keys(n):
print(f"made in {threading.current_thread()}")
with dblock:
session: Session = scopedmaker()
session.bulk_save_objects([Key(value=i * 100) for i in range(0, n)])
session.commit()


def take_keys(n):
print(f"used in {threading.current_thread()}")
with dblock:
session: Session = scopedmaker()
keys = session.query(Key).filter(Key.taken == False).limit(n).all()
for key in keys:
key.taken = True
print(keys)
session.commit()


def take_keys_2(n):
print(f"used in {threading.current_thread()}")
with dblock:
session: Session = scopedmaker()
keys = session.query(Key).filter(Key.taken == False).limit(n).all()
for key in keys:
key.taken = True
session.commit()
print(keys)


gen_keys(100)

# take_keys works just as expected
with ThreadPoolExecutor() as executor:
for _ in range(0, 5):
executor.submit(take_keys, 10)

# take_keys_2 breaks, raises following error
# >>> sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.
# >>> The object was created in thread id 12340 and this is thread id 4312
# according to the console log, 12340 is one of the ThreadPoolExecutor threads, and 4312 is the main thread
with ThreadPoolExecutor() as executor:
for _ in range(0, 5):
executor.submit(take_keys_2, 10)

我只有一个非常简单的类(class) Key它有一个值,可以标记为 taken ,想象一下类似赠品的东西,您不想将同一个赠品分发给不同的潜在客户。我用它来测试确实存在的竞争条件,并迫使我在数据库访问上使用锁,没什么大不了的,我可以忍受。

我真正不明白的是为什么take_keys有效,但是 take_keys_2当它们之间唯一的区别是 print(keys) 的位置时,就会中断。陈述。特别是因为在非功能性示例中,错误消息似乎是我在错误的线程中使用创建的对象(我不是,我只是在创建它的同一线程中的 session.commit() 之后使用它。

如果有人能解释为什么会发生这种情况,我会很高兴。

最佳答案

现在,我还没有掌握所有详细信息,但足以让您了解您的情况。 Threading support in SQLite不太好。因此,如果使用内存数据库,SQLAlchemy 的池行为默认为 SingletonThreadPool;如果使用文件,则默认为 NullPool。后者意味着根本没有池化,或者换句话说,连接始终根据请求打开和关闭。

print() 的位置很重要,因为上面对 session.commit() 的调用会使 session 中对象的所有数据库加载状态过期。因此,为了打印键列表(最终调用它们的__repr__),SQLAlchemy 必须重新获取每个对象的状态。如果您将 echo=True 添加到对 create_engine() 的调用中,这一点就会变得很明显。

毕竟,您在 take_keys_2 中的 session 正在与开放事务保持连接。这就是它变得有点困惑的地方:当函数退出时, session 超出范围,这意味着它所持有的连接最终会返回到池中。但该池是一个 NullPool,因此它最终确定并关闭连接并丢弃它。最终确定意味着回滚任何打开的事务,这就是失败的原因:

Traceback (most recent call last):
File "~/Work/sqlalchemy/lib/sqlalchemy/pool.py", line 705, in _finalize_fairy
fairy._reset(pool)
File "~/Work/sqlalchemy/lib/sqlalchemy/pool.py", line 876, in _reset
pool._dialect.do_rollback(self)
File "~/Work/sqlalchemy/lib/sqlalchemy/engine/default.py", line 457, in do_rollback
dbapi_connection.rollback()
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 140683561543424 and this is thread id 140683635291968

最终确定是在解释器关闭期间在“虚拟”线程中执行的,而不是在工作线程中执行的,因为连接一直处于延迟状态。

例如,如果您在 print(keys) 之后添加对 session.rollback() 的调用:

def take_keys_2(n):
...
with dblock:
...
session.commit()
print(keys)
session.rollback()

the connection is returned to the pool explicitly ,并且 take_keys_2 也可以工作。另一种选择是使用 expire_on_commit=False,这样提交后就不需要额外的查询来打印 Key 对象的表示:

def take_keys_2(n):
with dblock:
session: Session = scopedmaker(expire_on_commit=False)
...

关于python - SQLAlchemy 奇怪的线程行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50578890/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com