gpt4 book ai didi

python - SQLAlchemy 在多线程应用程序中正确的 session 处理

转载 作者:IT老高 更新时间:2023-10-28 20:39:04 30 4
gpt4 key购买 nike

我很难理解如何有效地正确打开和关闭数据库 session ,正如我在 sqlalchemy 文档中所理解的那样,如果我使用 scoped_session 构造我的 Session 对象,然后使用返回的 Session 对象来创建 session ,它是线程安全的,所以基本上每个线程都会得到它自己的 session ,并且不会有问题。现在下面的例子工作了,我把它放在一个无限循环中,看看它是否正确关闭了 session ,如果我正确监控它(在 mysql 中通过执行“SHOW PROCESSLIST;”),连接只是不断增长,它不会关闭它们,即使我使用了 session.close(),甚至在每次运行结束时删除了 scoped_session 对象。我究竟做错了什么?我在大型应用程序中的目标是使用所需的最少数据库连接数,因为我当前的工作实现在需要它的每个方法中创建一个新 session 并在返回之前将其关闭,这似乎效率低下。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from threading import Thread
from Queue import Queue, Empty as QueueEmpty
from models import MyModel


DATABASE_CONNECTION_INFO = 'mysql://username:password@localhost:3306/dbname'


class MTWorker(object):

def __init__(self, worker_count=5):
self.task_queue = Queue()
self.worker_count = worker_count
self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
self.DBSession = scoped_session(
sessionmaker(
autoflush=True,
autocommit=False,
bind=self.db_engine
)
)

def _worker(self):
db_session = self.DBSession()
while True:
try:
task_id = self.task_queue.get(False)
try:
item = db_session.query(MyModel).filter(MyModel.id == task_id).one()
# do something with item
except Exception as exc:
# if an error occurrs we skip it
continue

finally:
db_session.commit()
self.task_queue.task_done()
except QueueEmpty:
db_session.close()
return

def start(self):
try:
db_session = self.DBSession()
all_items = db_session.query(MyModel).all()
for item in all_items:
self.task_queue.put(item.id)

for _i in range(self.worker_count):
t = Thread(target=self._worker)
t.start()

self.task_queue.join()
finally:
db_session.close()
self.DBSession.remove()


if __name__ == '__main__':
while True:
mt_worker = MTWorker(worker_count=50)
mt_worker.start()

最佳答案

您应该只调用一次 create_enginescoped_session进程(每个数据库)。每个人都会有自己的连接或 session 池(分别),所以你要确保你只创建 one 池。只需将其设为模块级全局即可。如果您需要比这更精确地管理 session ,您可能不应该使用 scoped_session

另一个需要做的改变是直接使用 DBSession,就好像它是一个 session 。在 scoped_session 上调用 session 方法将透明如果需要,创建一个线程本地 session ,并将方法调用转发到 session 。

另一件需要注意的是 pool_size 的连接池,其中默认为 5。对于许多应用程序来说,这很好,但如果您正在创建很多线程,你可能需要调整那个参数

DATABASE_CONNECTION_INFO = 'mysql://username:password@localhost:3306/dbname'
db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
DBSession = scoped_session(
sessionmaker(
autoflush=True,
autocommit=False,
bind=db_engine
)
)


class MTWorker(object):

def __init__(self, worker_count=5):
self.task_queue = Queue()
self.worker_count = worker_count
# snip

关于python - SQLAlchemy 在多线程应用程序中正确的 session 处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9619789/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com