gpt4 book ai didi

sqlalchemy - celery beat 和 sqlalchemy + pyramid app 出现 "ResourceClosedError: The transaction is closed"错误

转载 作者:行者123 更新时间:2023-12-04 16:09:58 27 4
gpt4 key购买 nike

我有一个名为 mainsite 的 Pyramid 应用程序。

该站点以相当异步的方式工作,主要是通过从 View 启动线程来执行后端操作。

它通过sqlalchemy连接到mysql,并使用ZopeTransactionExtension进行 session 管理。

到目前为止,该应用程序运行良好。

我需要在它上面运行定期作业,它需要使用一些从 View 中启动的相同异步函数。

我使用了 apscheduler 但遇到了问题。所以我想到了使用 celery beat 作为一个单独的进程,将 mainapp 视为一个库并导入要使用的函数。

我的 celery 配置如下所示:

from datetime import timedelta
from api.apiconst import RERUN_CHECK_INTERVAL, AUTOMATION_CHECK_INTERVAL, \
AUTH_DELETE_TIME

BROKER_URL = 'sqla+mysql://em:em@localhost/edgem'
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = 'mysql://em:em@localhost/edgem'

CELERYBEAT_SCHEDULE = {
'rerun': {
'task': 'tasks.rerun_scheduler',
'schedule': timedelta(seconds=RERUN_CHECK_INTERVAL)
},
'automate': {
'task': 'tasks.automation_scheduler',
'schedule': timedelta(seconds=20)
},
'remove-tokens': {
'task': 'tasks.token_remover_scheduler',
'schedule': timedelta(seconds=2 * 24 * 3600 )
},
}

CELERY_TIMEZONE = 'UTC'

tasks.py是

from celery import Celery
celery = Celery('tasks')
celery.config_from_object('celeryconfig')


@celery.task
def rerun_scheduler():
from mainsite.task import check_update_rerun_tasks
check_update_rerun_tasks()


@celery.task
def automation_scheduler():
from mainsite.task import automate
automate()


@celery.task
def token_remover_scheduler():
from mainsite.auth_service import delete_old_tokens
delete_old_tokens()

请记住,上述所有函数都会立即返回,但会在需要时启动线程

线程通过在 session.add(object) 之后执行 transaction.commit() 将对象保存到 db。

问题是整件事情像 gem 一样只工作了大约 30 分钟。在那之后,ResourceClosedError: The transaction is closed 错误开始发生在任何有 transaction.commit() 的地方。我不确定是什么问题,我需要帮助进行故障排除。

我在任务中导入的原因是为了摆脱这个错误。认为每次需要运行任务时导入是个好主意,我可能每次都会得到一个新事务,但看起来情况并非如此。

最佳答案

根据我的经验,尝试将配置为与 Pyramid(使用 ZopeTransactionExtension 等)一起使用的 session 与 Celery worker 重用会导致难以调试的可怕困惑。

ZopeTransactionExtension 将 SQLAlchemy session 绑定(bind)到 Pyramid 的请求-响应周期 - 事务自动启动并提交或回滚,您通常不应该在代码中使用 transaction.commit() - 如果一切正常,中兴通讯将提交所有内容,如果您的代码引发异常,您的事务将被回滚。

使用 Celery,您需要手动管理 SQLAlchemy session ,而中兴通讯会阻止您这样做,因此您需要以不同方式配置 DBSession

像这样简单的东西会起作用:

DBSession = None

def set_dbsession(session):
global DBSession
if DBSession is not None:
raise AttributeError("DBSession has been already set to %s!" % DBSession)

DBSession = session

然后从 Pyramid 启动代码开始

def main(global_config, **settings):
...
set_dbsession(scoped_session(sessionmaker(extension=ZopeTransactionExtension())))

使用 Celery 有点棘手 - 我最终为 Celery 创建了一个自定义启动脚本,我在其中配置了 session 。

worker egg 的 setup.py 中:

  entry_points="""
# -*- Entry points: -*-
[console_scripts]
custom_celery = worker.celeryd:start_celery
custom_celerybeat = worker.celeryd:start_celerybeat
""",
)

worker/celeryd.py 中:

def initialize_async_session(db_string, db_echo):

import sqlalchemy as sa
from db import Base, set_dbsession

session = sa.orm.scoped_session(sa.orm.sessionmaker(autoflush=True, autocommit=True))
engine = sa.create_engine(db_string, echo=db_echo)
session.configure(bind=engine)

set_dbsession(session)
Base.metadata.bind = engine


def start_celery():
initialize_async_session(DB_STRING, DB_ECHO)
import celery.bin.celeryd
celery.bin.celeryd.main()

如果您打算将应用程序部署到生产服务器,那么您使用的“从 View 启动线程以执行后端操作”的一般方法对我来说有点危险 - 网络服务器经常回收,杀死或创建新的“ worker ”,因此通常无法保证每个特定进程都能在当前请求-响应周期之后继续存在。不过我从来没有尝试过这样做,所以也许你会没事的:)

关于sqlalchemy - celery beat 和 sqlalchemy + pyramid app 出现 "ResourceClosedError: The transaction is closed"错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16338665/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com