gpt4 book ai didi

celery - 如何将 Celery 任务绑定(bind)到特定队列?

转载 作者:行者123 更新时间:2023-12-01 12:40:37 24 4
gpt4 key购买 nike

我想编写一个只能从给定队列中执行的任务 - 如果有人试图将不同的队列传递到 apply_asyncrouting_key 参数我想引发异常。我该怎么做?

最佳答案

您可以编写自己的任务来检查以确保在调用 apply_async 时传入了有效的路由 key 。您也可以将其应用于队列。在您的配置中设置路由和队列:

import celery
from kombu import Queue, Exchange

app = celery.Celery('app')
app.conf.CELERY_QUEUES = (
Queue('add', Exchange('default'), routing_key='good'),
)
app.conf.CELERY_ROUTES = {
'app.add': {
'queue': 'add',
'routing_key': 'good'
}
}

现在,创建您自己的 Task 类,它将对路由键执行检查。您需要覆盖 apply_async:

class RouteCheckerTask(celery.Task):
abstract = True

def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
link=None, link_error=None, **options):

app = self._get_app()
routing_key = options.get('routing_key', None)
if routing_key:
valid_routes = [v['routing_key'] for k, v in app.conf.CELERY_ROUTES.items()]
is_valid = routing_key in valid_routes
if not is_valid:
raise NotImplementedError('{} is not a valid routing key. Options are: {}'.format(routing_key, valid_routes))
if app.conf.CELERY_ALWAYS_EAGER:
return self.apply(args, kwargs, task_id=task_id or uuid(), link=link, link_error=link_error, **options)
# add 'self' if this is a "task_method".
if self.__self__ is not None:
args = args if isinstance(args, tuple) else tuple(args or ())
args = (self.__self__, ) + args
return app.send_task(
self.name, args, kwargs, task_id=task_id, producer=producer,
link=link, link_error=link_error, result_cls=self.AsyncResult,
**dict(self._get_exec_options(), **options)
)

将您的任务基于此任务并正常调用 apply_async:

@app.task(base=RouteCheckerTask)
def add(x, y):
return x + y

# Fails
add.apply_async([1, 2], routing_key='bad')
# Passes
add.apply_async([1, 2], routing_key='good')

关于celery - 如何将 Celery 任务绑定(bind)到特定队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32850516/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com