gpt4 book ai didi

python - Celery:如何将失败的任务路由到死信队列

转载 作者:太空狗 更新时间:2023-10-29 18:08:33 34 4
gpt4 key购买 nike

我是 celery 的新手,我尝试将这个任务队列集成到我的项目中,但我仍然不明白 celery 如何处理失败的任务,我想将所有这些保留在 amqp 死信中排队。

根据文档 here似乎在启用了 acks_late 的任务中引发 Reject 会产生与确认消息相同的效果,然后我们再谈谈死信队列。

所以我在我的 celery 配置中添加了一个自定义默认队列

celery_app.conf.update(CELERY_ACCEPT_CONTENT=['application/json'],
CELERY_TASK_SERIALIZER='json',
CELERY_QUEUES=[CELERY_QUEUE,
CELERY_DLX_QUEUE],
CELERY_DEFAULT_QUEUE=CELERY_QUEUE_NAME,
CELERY_DEFAULT_EXCHANGE=CELERY_EXCHANGE
)

我的海带元素看起来像

CELERY_DLX_EXCHANGE = Exchange(CELERY_DLX_EXCHANGE_NAME, type='direct')
CELERY_DLX_QUEUE = Queue(CELERY_DLX_QUEUE_NAME, exchange=DLX_EXCHANGE,
routing_key='celery-dlq')

DEAD_LETTER_CELERY_OPTIONS = {'x-dead-letter-exchange': CELERY_DLX_EXCHANGE_NAME,
'x-dead-letter-routing-key': 'celery-dlq'}

CELERY_EXCHANGE = Exchange(CELERY_EXCHANGE_NAME,
arguments=DEAD_LETTER_CELERY_OPTIONS,
type='direct')

CELERY_QUEUE = Queue(CELERY_QUEUE_NAME,
exchange=CELERY_EXCHANGE,
routing_key='celery-q')

我正在执行的任务是:

class HookTask(Task):
acks_late = True

def run(self, ctx, data):
logger.info('{0} starting {1.name}[{1.request.id}]'.format(self.__class__.__name__.upper(), self))
self.hook_process(ctx, data)


def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.error('task_id %s failed, message: %s', task_id, exc.message)

def hook_process(self, t_ctx, body):
# Build context
ctx = TaskContext(self.request, t_ctx)
logger.info('Task_id: %s, handling request %s', ctx.task_id, ctx.req_id)
raise Reject('no_reason', requeue=False)

我用它做了一些测试,但在引发拒绝异常时没有结果。

现在我想知道通过覆盖 Task.on_failure 将失败的任务路由强制到死信队列是否是个好主意。我认为这可行,但我也认为这个解决方案不是那么干净,因为根据我的说法,红 celery 应该单独完成这一切。

感谢您的帮助。

最佳答案

我认为你不应该在 CELERY_EXCHANGE 中添加 arguments=DEAD_LETTER_CELERY_OPTIONS。您应该使用 queue_arguments=DEAD_LETTER_CELERY_OPTIONS 将其添加到 CELERY_QUEUE。

下面的例子是我做的,效果很好:

from celery import Celery
from kombu import Exchange, Queue
from celery.exceptions import Reject

app = Celery(
'tasks',
broker='amqp://guest@localhost:5672//',
backend='redis://localhost:6379/0')

dead_letter_queue_option = {
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'dead_letter'
}

default_exchange = Exchange('default', type='direct')
dlx_exchange = Exchange('dlx', type='direct')

default_queue = Queue(
'default',
default_exchange,
routing_key='default',
queue_arguments=dead_letter_queue_option)
dead_letter_queue = Queue(
'dead_letter', dlx_exchange, routing_key='dead_letter')

app.conf.task_queues = (default_queue, dead_letter_queue)

app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'


@app.task
def add(x, y):
return x + y


@app.task(acks_late=True)
def div(x, y):
try:
z = x / y
return z
except ZeroDivisionError as exc:
raise Reject(exc, requeue=False)

创建队列后,你应该看到在'Features'列中,它显示了DLX(dead-letter-exchange)和DLK(dead-letter -路由键)标签。

enter image description here

注意:如果您已经在 RabbitMQ 中创建了之前的队列,则应删除它们。这是因为 celery 不会删除现有队列并重新创建一个新队列。

关于python - Celery:如何将失败的任务路由到死信队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38111122/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com