gpt4 book ai didi

rabbitmq - Celery:连接错误时中止任务

转载 作者:行者123 更新时间:2023-12-02 21:35:08 25 4
gpt4 key购买 nike

我必须实现一个任务子类,如果代理没有运行,它会优雅地失败 - 目前我正在使用 RabbitMQ。我可能可以使用 try 语句来捕获异常:

try:
Mytask.delay(arg1, arg2)
except socket.error:
# Send an notice to an admin
pass

但我想创建一个可以处理该问题的 Task 子类。我尝试过类似的事情:

class MyTask(Task):
ignore_result = True

def __call__(self, *args, **kwargs):
try:
return self.run(*args, **kwargs)
except socket.error:
# Send an notice to an admin
return None

但工作流程显然是错误的。我想我需要以某种方式注入(inject)后端子类或失败策略。您有什么建议吗?

最佳答案

我想出的一个可能的解决方案:

import socket
from celery.decorators import task
from celery.task import Task
from celery.backends.base import BaseBackend

UNDELIVERED = 'UNDELIVERED'


class DummyBackend(BaseBackend):
"""
Dummy queue backend for undelivered messages (due to the broker being down).
"""
def store_result(self, *args, **kwargs):
pass

def get_status(self, *args, **kwargs):
return UNDELIVERED

def _dummy(self, *args, **kwargs):
return None

wait_for = get_result = get_traceback = _dummy


class SafeTask(Task):
"""
A task not raising socket errors if the broker is down.
"""
abstract = True
on_broker_error = None
errbackend = DummyBackend

@classmethod
def apply_async(cls, *args, **kwargs):
try:
return super(SafeTask, cls).apply_async(*args, **kwargs)
except socket.error, err:
if cls.on_broker_error is not None:
cls.on_broker_error(err, cls, *args, **kwargs)
return cls.app.AsyncResult(None, backend=cls.errbackend(),
task_name=cls.name)


def safetask(*args, **kwargs):
"""
Task factory returning safe tasks handling socket errors.
When a socket error occurs, the given callable *on_broker_error*
is called passing the exception object, the class of the task
and the original args and kwargs.
"""
if 'base' not in kwargs:

on_broker_error = kwargs.pop('on_broker_error', SafeTask.on_broker_error)
errbackend = kwargs.pop('errbackend', SafeTask.errbackend)
kwargs['base'] = type('SafeTask', (SafeTask,), {
'on_broker_error': staticmethod(on_broker_error),
'errbackend': errbackend,
'abstract': True,
})

return task(*args, **kwargs)

您可以子类化 SafeTask 或使用装饰器 @safetask。如果您能想到改进,请毫不犹豫地做出贡献。

关于rabbitmq - Celery:连接错误时中止任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7673092/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com