gpt4 book ai didi

python - 多台机器上的 Celery 任务

转载 作者:行者123 更新时间:2023-12-01 03:00:13 24 4
gpt4 key购买 nike

我有一台安装了 RabbitMQ 代理的服务器和两个连接到同一个代理的 Celery 使用者(ma​​in1.pyma​​in2.py)。

在第一个消费者 (ma​​in1.py) 中,我实现了一个 Celery Beat,它在特定队列上多次发送不同的任务:

app = Celery('tasks', broker=..., backend=...)
app.conf.task_routes = (
[
('tasks.beat', {'queue': 'print-queue'}),
],
)
app.conf.beat_schedule = {
'beat-every-10-seconds': {
'task': 'tasks.beat',
'schedule': 10.0
},
}

@app.task(name='tasks.beat', bind=True)
def beat(self):
for i in range(10):
app.send_task("tasks.print", args=[i], queue="print-queue")

return None

在第二个消费者(ma​​in2.py)中,我实现了上面所说的任务:

app = Celery('tasks', broker=..., backend=...)
app.conf.task_routes = (
[
('tasks.print', {'queue': 'print-queue'}),
],
)

@app.task(name='tasks.print', bind=True)
def print(self, name):
return name

当我启动两个 Celery 工作程序时:

consumer1: celery worker -A main1 -Q print-queue --beat
consumer2: celery worker -A main2 -Q print-queue

我收到这些错误:

[ERROR/MainProcess] Received unregistered task of type 'tasks.print'

关于第一个消费者

[ERROR/MainProcess] Received unregistered task of type 'tasks.beat'

关于第二个消费者

是否可以在连接到同一代理的不同 Celery 应用程序上拆分任务?

提前致谢!

最佳答案

这就是正在发生的事情。您有两个工作人员 AB,其中一个也恰好在运行 celerybeat(假设一个是 B)。

  1. celerybeat 将task.beat提交到队列。这一切所做的就是在rabbit中加入一些元数据(包括任务名称)的消息。
  2. 两名工作人员之一读取了该消息。 A 和 B 都在监听同一个队列,因此任何一方都可以读取它。

    a.如果 A 读取消息,它将尝试查找名为 tasks.beat 的任务,这会失败,因为 A 没有定义该任务。

    b.如果 B 读取该消息,它将成功尝试找到名为 tasks.beat 的任务(因为它确实有该任务)并将运行代码。 tasks.beat 将在rabbit中排队一条新消息,其中包含tasks.print的元数据。

  3. 同样的问题会再次出现,因为 A 和 B 中只有一个定义了 tasks.print,但其中任何一个都可能收到消息。

实际上, celery 可能会做一些检查以提前抛出错误消息,但我相当确定这是根本问题。

简而言之,队列中的所有工作人员(包括beat)都应该运行相同的代码。

关于python - 多台机器上的 Celery 任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43920621/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com