gpt4 book ai didi

python - 本地主机上的 Django/Celery 多个队列 - 路由不起作用

转载 作者:太空狗 更新时间:2023-10-29 17:56:47 28 4
gpt4 key购买 nike

我关注了celery docs在我的开发机器上定义 2 个队列。

我的 celery 设置:

CELERY_ALWAYS_EAGER = True
CELERY_TASK_RESULT_EXPIRES = 60 # 1 mins
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_CREATE_MISSING_QUEUES = True
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('feeds', Exchange('feeds'), routing_key='arena.social.tasks.#'),
)
CELERY_ROUTES = {
'arena.social.tasks.Update': {
'queue': 'fs_feeds',
},
}

我在项目的 virtualenv 中打开了两个终端窗口,并运行了以下命令:

terminal_1$ celery -A arena worker -Q default -B -l debug --purge -n deafult_worker
terminal_2$ celery -A arena worker -Q feeds -B -l debug --purge -n feeds_worker

我得到的是两个队列都在处理所有任务。

我的目标是让一个队列只处理 CELERY_ROUTES 中定义的一个任务,并让默认队列处理所有其他任务。

我也关注了这个SO questionrabbitmqctl list_queues返回celery 0,运行rabbitmqctl list_bindings两次返回exchange celery queue celery []。重启兔子服务器没有改变任何东西。

最佳答案

好吧,我想通了。以下是我的整个设置、设置以及如何运行 celery ,供那些可能想知道与我的问题相同的人使用。

设置

CELERY_TIMEZONE = TIME_ZONE
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1

# celery queues setup
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('feeds', Exchange('feeds'), routing_key='long_tasks'),
)
CELERY_ROUTES = {
'arena.social.tasks.Update': {
'queue': 'feeds',
'routing_key': 'long_tasks',
},
}

如何运行celery?

终端 - 选项卡 1:

celery -A proj worker -Q default -l debug -n default_worker

这将启动第一个使用默认队列中的任务的工作人员。笔记! -n default_worker 对于第一个 worker 来说不是必须的,但是如果你有任何其他 celery 实例正在运行,那么它是必须的。设置 -n worker_name--hostname=default@%h 相同。

终端 - 选项卡 2:

celery -A proj worker -Q feeds -l debug -n feeds_worker

这将启动第二个 worker,它从 feeds 队列中消费任务。请注意 -n feeds_worker,如果您使用 -l debug(日志级别 = debug)运行,您将看到两个 worker 正在同步。

终端 - 选项卡 3:

celery -A proj beat -l debug

这将启动节拍,根据您的 CELERYBEAT_SCHEDULE 中的时间表执行任务。我不必更改任务或 CELERYBEAT_SCHEDULE

例如,这是我的 CELERYBEAT_SCHEDULE 应该进入提要队列的任务的样子:

CELERYBEAT_SCHEDULE = {
...
'update_feeds': {
'task': 'arena.social.tasks.Update',
'schedule': crontab(minute='*/6'),
},
...
}

如您所见,无需添加 'options': {'routing_key': 'long_tasks'} 或指定它应该进入的队列。另外,如果您想知道为什么 Update 是大写的,那是因为它是一个自定义任务,被定义为 celery.Task 的子类。

更新 Celery 5.0+

Celery 自版本 5 以来进行了一些更改,这里是任务路由的更新设置。

如何创建队列?

Celery 可以自动创建队列。它非常适合简单的情况,其中路由的 celery 默认值是可以的。

task_create_missing_queues=True 或者,如果您使用的是 django 设置并且您在 CELERY_ 键下命名空间所有 celery 配置,CELERY_TASK_CREATE_MISSING_QUEUES=True。请注意,它默认处于启用状态。

自动计划任务路由

配置 celery 应用程序后:

celery_app.conf.beat_schedule = {
"some_scheduled_task": {
"task": "module.path.some_task",
"schedule": crontab(minute="*/10"),
"options": {"queue": "queue1"}
}
}

自动任务路由

Celery 应用程序仍然需要先配置,然后:

app.conf.task_routes = {
"module.path.task2": {"queue": "queue2"},
}

任务的手动路由

如果您想动态路由任务,那么在发送任务时指定队列:

from module import task

def do_work():
# do some work and launch the task
task.apply_async(args=(arg1, arg2), queue="queue3")

可以在此处找到有关重新路由的更多详细信息: https://docs.celeryproject.org/en/stable/userguide/routing.html

关于在这里调用任务: https://docs.celeryproject.org/en/stable/userguide/calling.html

关于python - 本地主机上的 Django/Celery 多个队列 - 路由不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23129967/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com