gpt4 book ai didi

python - SqlAlchemy + Celery 与 Scoped Session 错误

转载 作者:行者123 更新时间:2023-12-04 14:26:00 29 4
gpt4 key购买 nike

我正在尝试运行一个 celery_beat 作业来启动一堆并行作业,但出现错误:ResourceClosedError: This result object does not return rows. It has been closed automatically.
这是我的相关文件。请注意,我正在使用 scoped_session:

#db.py
engine = create_engine(SETTINGS['DATABASE_URL'], pool_recycle=3600, pool_size=10)
db_session = scoped_session(sessionmaker(
autocommit=False, autoflush=False, bind=engine))

#tasks.py
from db import db_session
@app.task
def db_task(pid):

db_session()
r = db_session.query(exists().where(RSSSummary.id == pid)).scalar()

print pid, r
db_session.remove()


@app.task
def sched_test():
ids =[0, 1]

db_task.delay(ids[0])
db_task.delay(ids[1])

然后当我尝试启动 sched_test ,像这样:

>>> tasks.sched_test.delay()
DatabaseError: (psycopg2.DatabaseError) error with status PGRES_TUPLES_OK and no message from the libpq

ResourceClosedError: This result object does not return rows. It has been closed automatically.
我相信我正确使用了 scoped_sessions。

有什么建议?

最佳答案

我有同样的错误以及一些错误,例如:

DatabaseError: server sent data ("D" message) without prior row description ("T" message)
lost synchronization with server: got message type "�", length -1244613424

DatabaseError: lost synchronization with server: got message type "0", length 842674226

事实证明,这是因为我的 Celery 工作进程正在共享 SQLAlchemy 连接。 SQLAlchemy docs解决这个问题:

It’s critical that when using a connection pool, and by extension when using an Engine created via create_engine(), that the pooled connections are not shared to a forked process. TCP connections are represented as file descriptors, which usually work across process boundaries, meaning this will cause concurrent access to the file descriptor on behalf of two or more entirely independent Python interpreter states.



我通过使用 Celery 事件在工作器启动时使池中的所有现有连接无效来解决此问题:

from celery.signals import worker_process_init

@worker_process_init.connect
def prep_db_pool(**kwargs):
"""
When Celery fork's the parent process, the db engine & connection pool is included in that.
But, the db connections should not be shared across processes, so we tell the engine
to dispose of all existing connections, which will cause new ones to be opend in the child
processes as needed.
More info: https://docs.sqlalchemy.org/en/latest/core/pooling.html#using-connection-pools-with-multiprocessing
"""
# The "with" here is for a flask app using Flask-SQLAlchemy. If you don't
# have a flask app, just remove the "with" here and call .dispose()
# on your SQLAlchemy db engine.
with some_flask_app.app_context():
db.engine.dispose()

关于python - SqlAlchemy + Celery 与 Scoped Session 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43944787/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com