gpt4 book ai didi

python - 如何覆盖 send_task BaseTask 类?

转载 作者:行者123 更新时间:2023-11-28 19:25:57 25 4
gpt4 key购买 nike

我做了一些迷你框架,以便能够在 rabbitmq 关闭的情况下捕获 celery 的连接错误,它以更优雅的方式处理错误并且它工作得很好很好,除非使用 send_task

下面是一些代码来阐明这个想法:

class MyBaseTask(base_task.Task):
""" Base Class to handle tasks from Me hohoho!"""
abstract = True

@classmethod
def delay(cls, *args, **kwargs):
"""Hook to catch connection errors"""
try:
return super(MyBaseTask, cls).apply_async(args, kwargs)
except socket.error as e:
cls._safe_failover() # a function to handle this error
cls.get_logger().error(str(e))
except Exception as e:
cls.get_logger().error("Uknown Error: %s" % str(e))
raise # normal exception

现在我继承 MyBaseTask 类:

class MyL33tTask(MyBaseTask):
name = 'task.my_leet_task'

def run(self, *args, **kwargs):
# yada yada

它会在发生套接字错误时执行 safe_failover 函数(也就是当 rabbitmq 关闭时)。遗憾的是,当我使用 send_task('task.my_leet_task') 时,这不会发生,因为它使用某种代理,其中 MyBaseTask 未加载。

是否有一种简单的方法来覆盖 send_task 以使用 MyBaseTask 代替?

最佳答案

据我所知,您的逻辑的主要用途是确保您确实向代理发送任务。

如果我的理解是正确的,那么您的方法可能是错误的,让我解释一下原因。使用消息传递时,主要优点是您可以通过将消息传递给代理来安排任务,方法 send_task 事实上他对任务本身一无所知,它只是为 Celery 编写格式良好的消息并发送它到配置的代理(http://docs.celeryproject.org/en/latest/faq.html#can-i-call-a-task-by-name)。

考虑到这一点,很明显应该在调用*send_message*的地方处理“发送消息失败”的异常捕获。延迟方法可以保持这种方式,但我建议更明确,并在您实际调用 delay() 的地方保留捕获逻辑,因为如果任务未安排,该怎么做又不取决于任务,而是取决于调度程序。

关于python - 如何覆盖 send_task BaseTask 类?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13188849/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com