gpt4 book ai didi

python - Celery+Django -- 具有数据库相关任务的原子事务

转载 作者:太空宇宙 更新时间:2023-11-03 17:22:30 24 4
gpt4 key购买 nike

在我当前使用 Django、Docker-Compose 和 Celery(以及其他)的项目中,基本上传文件函数 insertIntoDatabase 是从任务和 View 中调用的。 py 任务被延迟调用。

在数据库插入器.py中:

def insertIntoDatabase(datapoints, user, description): # datapoints is a list of dictionaries, user and description are just strings
# convert data and upload to our database

在tasks.py中:

@app.task()
def db_ins_task(datapoints, user, description):
from databaseinserter import insertIntoDatabase
insertIntoDatabase(datapoints, user, description)

在views.py中:

with transaction.atomic():
db_ins_task.delay(datapoints, user, description)

在将 Celery 引入项目之前,只是在 views.py 中直接调用 insertIntoDatabase,因此不会插入任何无效的数据点列表(即格式不正确)整个上传将被取消并回滚。然而,现在上传是在异步 celery 任务中,无效的上传不再被正确回滚。既然上传是一项任务,如何确保无效上传仍然被取消并完全撤消? Django 1.9 似乎有一些新功能可能正是我所需要的:transaction.on_commit。然而,目前切换到 1.9 的主要问题是,我们项目中的一个重要依赖项 Django-Hstore 似乎并不兼容。 1.9 也处于 alpha 阶段,因此即使两者兼容,目前使用起来也并不理想。有没有办法在 Django 1.8 中做到这一点?

我还研究了 django_transaction_barrier 并尝试使用它,但没有成功。在tasks.py中我将任务更改为

@task(base=TransactionBarrierTask)
def db_ins_task(datapoints, user, description):
from databaseinserter import insertIntoDatabase
insertIntoDatabase(datapoints, user, description)

在views.py中我更改了任务执行:

with transaction.atomic():
db_ins_task.apply_async_with_barrier(args=(data, user, description,))

但是,我的主要问题是,一旦收到任务,Celery 就会抛出一个关于意外关键字参数的错误:

worker_1   | Traceback (most recent call last):
worker_1 | File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
worker_1 | R = retval = fun(*args, **kwargs)
worker_1 | File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
worker_1 | return self.run(*args, **kwargs)
worker_1 | TypeError: db_ins_task() got an unexpected keyword argument '__transaction_barrier'

那么,解决这个问题的最佳方法是什么?我应该继续尝试使用 django_transaction_barrier (如果我确实将它用于正确的事情)?如果是这样,我做错/遗漏了什么会导致错误?如果没有,有什么更好的方法来从我的数据库中清除无效上传?

最佳答案

Celery 是一个异步任务运行程序,基本上一旦任务交给 celery,它就会立即生效。您不能跨进程边界进行事务,因为 celery 将作为工作线程运行。

您始终可以运行另一个任务来查找无效数据点并清理数据库。简而言之,您想要一个具有两阶段提交的分布式事务,但这并不容易实现,因为它有自己的问题,并且不确定在 Python 中是否可用。

关于python - Celery+Django -- 具有数据库相关任务的原子事务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32999857/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com