gpt4 book ai didi

transactions - Pyramid ZopeTransactionExtension 与 celery : how can I commit transaction immediately without keep transaction after web request?

转载 作者:行者123 更新时间:2023-12-02 21:57:34 32 4
gpt4 key购买 nike

这是我的情况。

我正在构建一个 RESTful Web 服务,从客户端接收数据,然后根据该数据创建一个事件,然后我想将这个新事件推送到 celery 以异步处理它。

我使用 Pyramid 构建 RESTful Web 服务,并使用 Pyramid_celery 使 Pyramid 和 celery 协同工作。

这是我的 View 的源代码:

# views.py
# This code recive data from client, then create a new record Event from this

posted_data = schema.deserialize(request.POST.mixed())

e = Event()
e.__dict__.update(posted_data)
DBSession.add(e)
transaction.commit()

print "Commited #%d" % e.id # code mark 01
fire_event.delay(e.id) # fire_event is a celery task
logging.getLogger(__name__).info('Add event #%d to tasks' % e.id)

这是我的任务的源代码:

# tasks.py
@celery.task()
def fire_event(event_id):
e = DBSession.query(Event).get(event_id)

if e is None:
return

print "Firing event %d#%s" % (event_id, e)
logger.info("Firing event %d#%s", event_id, e)

如果我使用 Pyramid 炼金术支架中的默认代码,则会在代码标记 01 行处引发异常。像这样的异常:

DetachedInstanceError: Instance <Event at ...> is not bound to a Session; ...

来自ZopeAlchemy document ,为了避免这种异常,我这样配置 DBSession:

# models.py
DBSession = scoped_session(sessionmaker(
extension=ZopeTransactionExtension(keep_session=True)
))

现在我的问题是在我的 RESTful 请求完成后 Pyramid 与我的 MySQL 服务器保持事务。当 RESTful 请求完成时,我转到 MySQL 服务器并运行命令:

SHOW engine innodb status;

从结果中,我看到了这一点:

--TRANSACTION 180692, ACTIVE 84 sec
MySQL thread id 94, OS thread handle 0x14dc, query id 1219 [domain] [ip] [project name] cleaning up
Trx read view will not see trx with id >= 180693, sees < 180693

这意味着 Pyramid 仍然保持连接,没关系,但是 Pyramid 也启动了一个事务,这是一个问题。当我尝试使用其他工具访问我的 MySQL 服务器时,此事务可能会让我陷入困境。

我的问题是:

如何让 Pyramid 在 RESTful 请求完成后立即关闭事务。如果我不能,对于我的情况还有其他解决方案吗?

非常感谢。

最佳答案

Celery 保持着一种“透明地”将代码作为任务运行的错觉 - 你用 @task 装饰你的函数,然后使用 my_function.delay() ,一切都会神奇地工作。

事实上,要意识到这一点有点棘手,你的代码运行在一个完全不同的进程中,可能在另一台机器上,可能是几分钟/小时后,并且该进程中不存在 Pyramid 请求/响应周期,因此ZopeTransactionExtension不能用于在请求完成时自动提交工作进程中的事务 - 因为没有请求,只有一个长时间运行的工作进程。

所以这不是 Pyramid 让未完成的交易挂起 - 这是你的工作进程。当您调用 e = DBSession.query(Event).get(event_id) 时,事务由 SQLAlchemy 启动,并且永远不会结束。

在这里,我对类似问题写了一个更长的答案,其中包含更多细节:https://stackoverflow.com/a/16346587/320021 - 重点是为您的工作进程使用不同的 session

另一件事是,最好避免在 Pyramid 代码中使用 transaction.commit(),因为对象会过期和其他丑陋的情况。在 Pyramid 中,可以在请求完成后调用一个函数 - 我编写了一个函数,它注册一个回调,从那里调用一个 celery 任务:

from repoze.tm import after_end
import transaction

def invoke_task_after_commit(task_fn, task_args, task_kwargs):
"""
This should ONLY be used within the web-application process managed by repoze.tm2
otherwise a memory leak will result. See http://docs.repoze.org/tm2/#cleanup
for more details.
"""
t = transaction.get() # the current transaction

def invoke():
task_fn.apply_async(
args=task_args,
kwargs=task_kwargs,
)

after_end.register(invoke, t)

(我从函数中删除了很多不相关的代码,因此可能存在拼写错误等。视为伪代码)

关于transactions - Pyramid ZopeTransactionExtension 与 celery : how can I commit transaction immediately without keep transaction after web request?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17499171/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com