- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我关注了celery docs在我的开发机器上定义 2 个队列。
我的 celery 设置:
CELERY_ALWAYS_EAGER = True
CELERY_TASK_RESULT_EXPIRES = 60 # 1 mins
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_CREATE_MISSING_QUEUES = True
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('feeds', Exchange('feeds'), routing_key='arena.social.tasks.#'),
)
CELERY_ROUTES = {
'arena.social.tasks.Update': {
'queue': 'fs_feeds',
},
}
我在项目的 virtualenv 中打开了两个终端窗口,并运行了以下命令:
terminal_1$ celery -A arena worker -Q default -B -l debug --purge -n deafult_worker
terminal_2$ celery -A arena worker -Q feeds -B -l debug --purge -n feeds_worker
我得到的是两个队列都在处理所有任务。
我的目标是让一个队列只处理 CELERY_ROUTES
中定义的一个任务,并让默认队列处理所有其他任务。
我也关注了这个SO question ,rabbitmqctl list_queues
返回celery 0
,运行rabbitmqctl list_bindings
两次返回exchange celery queue celery []
。重启兔子服务器没有改变任何东西。
最佳答案
好吧,我想通了。以下是我的整个设置、设置以及如何运行 celery ,供那些可能想知道与我的问题相同的人使用。
设置
CELERY_TIMEZONE = TIME_ZONE
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
# celery queues setup
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('feeds', Exchange('feeds'), routing_key='long_tasks'),
)
CELERY_ROUTES = {
'arena.social.tasks.Update': {
'queue': 'feeds',
'routing_key': 'long_tasks',
},
}
如何运行celery?
终端 - 选项卡 1:
celery -A proj worker -Q default -l debug -n default_worker
这将启动第一个使用默认队列中的任务的工作人员。笔记! -n default_worker
对于第一个 worker 来说不是必须的,但是如果你有任何其他 celery 实例正在运行,那么它是必须的。设置 -n worker_name
与 --hostname=default@%h
相同。
终端 - 选项卡 2:
celery -A proj worker -Q feeds -l debug -n feeds_worker
这将启动第二个 worker,它从 feeds 队列中消费任务。请注意 -n feeds_worker
,如果您使用 -l debug
(日志级别 = debug)运行,您将看到两个 worker 正在同步。
终端 - 选项卡 3:
celery -A proj beat -l debug
这将启动节拍,根据您的 CELERYBEAT_SCHEDULE
中的时间表执行任务。我不必更改任务或 CELERYBEAT_SCHEDULE
。
例如,这是我的 CELERYBEAT_SCHEDULE
应该进入提要队列的任务的样子:
CELERYBEAT_SCHEDULE = {
...
'update_feeds': {
'task': 'arena.social.tasks.Update',
'schedule': crontab(minute='*/6'),
},
...
}
如您所见,无需添加 'options': {'routing_key': 'long_tasks'}
或指定它应该进入的队列。另外,如果您想知道为什么 Update
是大写的,那是因为它是一个自定义任务,被定义为 celery.Task
的子类。
更新 Celery 5.0+
Celery 自版本 5 以来进行了一些更改,这里是任务路由的更新设置。
如何创建队列?
Celery 可以自动创建队列。它非常适合简单的情况,其中路由的 celery 默认值是可以的。
task_create_missing_queues=True
或者,如果您使用的是 django 设置并且您在 CELERY_
键下命名空间所有 celery 配置,CELERY_TASK_CREATE_MISSING_QUEUES=True
。请注意,它默认处于启用状态。
自动计划任务路由
配置 celery 应用程序后:
celery_app.conf.beat_schedule = {
"some_scheduled_task": {
"task": "module.path.some_task",
"schedule": crontab(minute="*/10"),
"options": {"queue": "queue1"}
}
}
自动任务路由
Celery 应用程序仍然需要先配置,然后:
app.conf.task_routes = {
"module.path.task2": {"queue": "queue2"},
}
任务的手动路由
如果您想动态路由任务,那么在发送任务时指定队列:
from module import task
def do_work():
# do some work and launch the task
task.apply_async(args=(arg1, arg2), queue="queue3")
可以在此处找到有关重新路由的更多详细信息: https://docs.celeryproject.org/en/stable/userguide/routing.html
关于在这里调用任务: https://docs.celeryproject.org/en/stable/userguide/calling.html
关于python - 本地主机上的 Django/Celery 多个队列 - 路由不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23129967/
我对 Python-Django 和 web 开发还很陌生,我被困在这个使用 POST 创建新资源的特殊问题上。 我正在为 REST API 使用 Django REST 框架,我正在尝试创建一个新资
我已经使用 Django-storages 成功地将 Word 文档存储到 S3。 class Document(TitleSlugDescriptionModel, TimeStampedModel
我有 2 个关于模型代理的问题, 如何从模型对象创建代理对象? 如何从模型查询集创建代理查询集? 例如,假设我们定义了: from django.contrib.auth.models import
我想编写一个直接执行 HTTP 请求的单元测试(而不是使用 django.test.client.Client)。 如果您好奇为什么 - 那是因为我想测试我从 Django 应用程序公开的 Thrif
我为我的个人网站启动了一个 django 项目来学习 django。到目前为止,我已经将我的开发环境设置为我需要的一切,并遵循 this很棒的教程来创建一些基本的数据结构和模板。现在我想开始使用我之前
我已经阅读了很多关于如何在使用 Django 注册时添加额外字段的信息,例如 here 、 here 和 here 。代码片段是: forms.py(来自注册应用程序) class Registrat
我正在编写小型社交应用程序。功能之一是在网站标题中写入用户名。因此,例如,如果我登录并且我的名字是Oleg(用户名),那么我应该看到: Hello, Oleg | Click to edit prof
我有一个使用 Django 和 Django Rest 框架开发的应用程序。我想将 django-reversion 功能添加到我的应用程序中。 我已经尝试过http://django-reversi
我有一个简单的 HTML 表单,我没有使用 Django 表单,但现在我想添加一个选择。 选择最容易创建为 Django ChoiceField (与通过循环等手动创建选择相反),但是,如果没有在 D
我不明白为什么人们以两种方式编写外键,这样做的目的是什么?它们是相同还是不同? 我注意到有些人这样写: author = models.ForeignKey(Author, on_delete=mod
我想在我的 Django 应用程序中获取评论最多的十个帖子,但我做不到,因为我想不出合适的方法。 我目前正在使用 django 评论框架,并且我已经看到使用 aggregate or annotate
这对于 Django 1.2 仍然有效吗? Custom Filter in Django Admin on Django 1.3 or below 我已经尝试过,但管理类中的 list_filter
问题在于,当 django-compressor 编译为 .js 文件的 CoffeeScript 文件中引用 {{ STATIC_URL }} 时,它无法正确加载。 在我的 django 模板中,我
我正在尝试将一些字段从一个 django 模型移动到一个新模型。假设我有一个书籍模型: class Book(models.Model): title = models.CharField(max
我想在我的 Django 应用程序中获取评论最多的十个帖子,但我做不到,因为我想不出合适的方法。 我目前正在使用 django 评论框架,并且我已经看到使用 aggregate or annotate
目前我正在寻找在 Django 中实现访问控制。我已经阅读了有关内置权限的内容,但它并不关心每个对象的基础。例如,我想要“只有创建者可以删除自己的项目”之类的权限。所以我读到了 django-guar
嗨,我正在将我的 Django 模型的一个字段的值设置为其他模型的另一个字段的值。这个值应该是动态变化的。 这是我的第一个模型 class MainModel(AbstractBaseUser, Pe
我正在尝试为我的模型创建一个编辑表单。我没有使用模型表单,因为根据模型类型,用户可以使用不同的表单。 (例如,其中一个表单有 Tinymce 小部件,而另一个没有。) 有没有什么方法可以使用模型设置表
Django 模板中的搜索字段 如何在类似于此图像的 Django 模板中创建搜索字段 http://asciicasts.com/system/photos/1204/original/E354I0
根据 Django documentation ,如果 Django 安装激活了 AuthenticationMiddleware,HttpRequest 对象有一个“user”属性代表当前登录的用户
我是一名优秀的程序员,十分优秀!