- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
Celery 的 multiprocessing.JoinableQueue
是什么? (或 gevent.queue.JoinableQueue
)?
我正在寻找的功能是能够 .join()
来自发布者的 Celery 任务队列,等待队列中的所有任务完成。
等待初始的 AsyncResult
或 GroupResult
是不够的,因为队列会由 worker 自己动态填满。
最佳答案
它可能并不完美,但这是我最终想到的。
它基本上是一个基于共享 Redis 计数器和列表监听器的现有 Celery 队列之上的 JoinableQueue
包装器。它要求队列名称与其路由键相同(由于 before_task_publish
和 task_postrun
信号的内部实现细节)。
joinableceleryqueue.py:
from celery.signals import before_task_publish, task_postrun
from redis import Redis
import settings
memdb = Redis.from_url(settings.REDIS_URL)
class JoinableCeleryQueue(object):
def __init__(self, queue):
self.queue = queue
self.register_queue_hooks()
def begin(self):
memdb.set(self.count_prop, 0)
@property
def count_prop(self):
return "jqueue:%s:count" % self.queue
@property
def finished_prop(self):
return "jqueue:%s:finished" % self.queue
def task_add(self, routing_key, **kw):
if routing_key != self.queue:
return
memdb.incr(self.count_prop)
def task_done(self, task, **kw):
if task.queue != self.queue:
return
memdb.decr(self.count_prop)
if memdb.get(self.count_prop) == "0":
memdb.rpush(self.finished_prop, 1)
def register_queue_hooks(self):
before_task_publish.connect(self.task_add)
task_postrun.connect(self.task_done)
def join(self):
memdb.brpop(self.finished_prop)
我选择使用 BRPOP
而不是发布/订阅,因为我只需要一个监听器来监听“所有任务已完成”事件(发布者)。
使用 JoinableCeleryQueue
非常简单 - begin()
在将任何任务添加到队列之前,使用常规 Celery API 添加任务,.join()
等待所有任务完成。
关于python - Celery 相当于一个 JoinableQueue,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39401374/
JoinableQueue 的文档说 join() Block until all items in the queue have been gotten and processed 我知道当我调用P
我正在尝试对 multiprocessing.JoinableQueue 进行子类化,这样我就可以跟踪被跳过而不是完成的作业。我正在使用 JoinableQueue 将作业传递给一组 multipro
Celery 的 multiprocessing.JoinableQueue 是什么? (或 gevent.queue.JoinableQueue)? 我正在寻找的功能是能够 .join() 来自发布
在 Python 中使用多处理模块时有两种队列: 队列 加入队列。 它们有什么区别? 队列 from multiprocessing import Queue q = Queue() q.put(it
我正在使用 Python multiprocessing.JoinableQueue 类,并且我试图对队列施加大小限制。如果队列已满达到此限制,循环将休眠并在队列中的空间释放时尝试重新添加任务,但我似
我一直在尝试在 Gevent 中查找 JoinableQueue 的长度,但它抛出一个错误,指出队列不可迭代。 有没有办法在我开始从中弹出项目之前找出排队的项目数量。 谢谢 最佳答案 来自here :
直接来自 Python docs : class multiprocessing.Queue([maxsize]) ... qsize() Return the approximate size of
我是一名优秀的程序员,十分优秀!