gpt4 book ai didi

python - Celery:轮询正在运行的任务以获取结果

转载 作者:太空宇宙 更新时间:2023-11-04 03:25:40 25 4
gpt4 key购买 nike

项目/

celery .py

from __future__ import absolute_import

from kombu import Exchange, Queue
from celery import Celery

app = Celery('proj',
broker='redis://myredis.com',
backend='redis://myredis.com',
include=['proj.tasks'])

a_exchange = Exchange('a_ex', type='topic')

# Optional configuration, see the application user guide.
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
CELERY_DISABLE_RATE_LIMITS=True,
CELERY_ROUTES = {"app.tasks.timeme": "a"}
)

if __name__ == '__main__':
app.start()

任务.py

from __future__ import absolute_import

from proj.celery import app
import time

@app.task
def timeme(ts):
print 'hi'
lat = time.time() - float(ts)
return (lat, time.time())

do_tasks.py

import proj.tasks
import time
import sys

stime = time.time()
running = []
while time.time() < stime + 15:
res = proj.tasks.timeme.apply_async(args=[time.time()], link=proj.tasks.timeme.s())
running.append(res)

for res in running: #<------------ this gets extremely slow if running gets big!
if res.ready()
print res.get()

在上面的代码中,随着 running 越来越大,循环 running 并查看它是否准备好结果需要很长时间。

在运行 celery 任务时是否有类似 select.selectpoll/epoll 的东西?

所以我可以做类似下面的事情:

While running:
read, w, e = select.select([running], [], [])
print read.get()
running.remove(read)
break

最佳答案

一句话,没有。

但是您可以使用 celery.app.control.inspect 获得您想要的东西

i = app.control.inspect()
i.active()
[{'worker1.example.com':
[{'name': 'tasks.sleeptask',
'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
'args': '(8,)',
'kwargs': '{}'}]}]

关于python - Celery:轮询正在运行的任务以获取结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33093579/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com