gpt4 book ai didi

python - 停止 celery worker 处理任务或监控 Flower 中的 'unconsumed' 任务

转载 作者:太空宇宙 更新时间:2023-11-03 11:40:36 25 4
gpt4 key购买 nike

假设我在 celery 队列上的所有任务都在访问第 3 方 API。但是,API 有一个速率限制,我正在跟踪(有一个我需要遵守的天限制和小时限制)。一旦我达到速率限制,我想暂停新任务的消费,然后在我知道我好的时候恢复。

我通过使用以下两个任务实现了这一点:

@celery.task()
def cancel_api_queue(minutes_to_resume):
resume_api_queue.apply_async(countdown=minutes_to_resume*60, queue='celery')
celery.control.cancel_consumer('third_party', reply=True)

@celery.task(default_retry_delay=300, max_retries=5)
def resume_api_queue():
celery.control.add_consumer('third_party', destination=['y@local'])

然后我可以继续提交我的第 3 方 API 任务,一旦我的消费者被添加回来,我的所有任务都会被消耗掉。太好了。

但是,由于我在这个队列中没有消费者,这似乎意味着我看不到正在提交的作业 Flower任何更多(直到我的消费者被添加)。

我做错了什么吗?我能否通过另一种方式实现这种“暂停”,让我继续在 flower 中查看提交的作业?

附注也许这与此问题有关,但不是 100% 确定:https://github.com/celery/celery/issues/1452

如果有区别,我正在使用 amqp 代理。

感谢女孩和男孩。

最佳答案

我怀疑在工作人员接收消息之前查看队列消息的内容并不是 Flower 预期设计的一部分。因此,如果您停止使用队列中的任务,Flower 能做的最好的事情就是在“Broker” Pane 中以单个数字的形式向您显示有多少任务已入队。

观察传入任务内部结构的一种 hackish 方法可能是添加一个中间虚拟“转发”任务,它只是将消息从一个队列(我们称之为 query_inbox)转发到另一个队列(例如,query_processing)。

例如像这样的东西:

@celery.task(queue='query_inbox')
def query(params):
process_query.delay(params)

@celery.task(queue='query_processing')
def process_query(params):
... do rate-limited stuff ...

现在您可以停止使用来自 query_processing 的任务,但您仍然可以在它们流经 query_inbox worker 时观察它们的参数。

关于python - 停止 celery worker 处理任务或监控 Flower 中的 'unconsumed' 任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50280321/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com