gpt4 book ai didi

concurrency - 使用 Celery 同时由两个工作进程运行的独特任务

转载 作者:可可西里 更新时间:2023-11-01 11:15:29 60 4
gpt4 key购买 nike

我正在从事一个项目,该项目的目标是运行一个将任务发送到 Celery 队列的守护进程,Redis 用作代理。每个任务必须一次处理一次(不允许并发)。

为了执行此操作,我在我的守护进程中实现了以下代码,它充当 Redis 的锁:

while True:    
for foo in bar:
if not self.redis_client.exists(foo.name):
# Send the task to the Celery queue
task = celery_app.send_task('buzz', context={'name': foo.name})
redis_client.send(foo.name, task.id)
time.sleep(10)

一旦任务完成或失败,锁将由任务自行释放。

由于某些我不明白的原因,任务有时会同时由两个工作进程运行:

[2018-04-11 15:23:45,705: INFO/ForkPoolWorker-1] Task has been executed in 101.43s for foo
[2018-04-11 15:23:45,881: INFO/ForkPoolWorker-4] Task has been executed in 114.66s for foo

它不会经常发生,但我不希望它发生。什么可以解释这种行为?会不会跟 Redis 写键值对的开销时间有关?

作为附加信息,我还在同一台服务器上运行了一个 Flower 实例。

最佳答案

这里有很多遗漏的细节,但我会尽力提供帮助:由于您的要求 - 没有并发 - 我猜你只有一个 celery worker 在运行。当您运行此工作程序时,您可以通过 -c 标志(或 --concurrency)指定并发级别 - 确保将其设置为 1,以便只有该工作程序的一个实例会住一次。引用 here

例如:celery -A proj worker --loglevel=INFO --concurrency=1 -n worker1@%h

您应该注意的另一件事是 worker_prefetch_multiplier,它默认一次预取 4 条消息。您可能也想将其更改为 1(我猜您没有描述您的完整场景)。引用 here

关于您的 redis 锁的最后一件事,请考虑使用 SETNX(如果不存在则设置)- 更多信息 - here

祝你好运!

关于concurrency - 使用 Celery 同时由两个工作进程运行的独特任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49779372/

60 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com