gpt4 book ai didi

python - 如何等待 celery 生产者 celery 工作人员完成其任务

转载 作者:行者123 更新时间:2023-12-01 07:10:29 24 4
gpt4 key购买 nike

我最近一直在尝试想出一种好的方法来等待任务生产者由 celery 工作人员完成。我想出了一种方法,但似乎效果不够快,如下:

celery 生产者:

leafs = []

def chain_tasks():
for i in range(1, 10):
p1 = ping1.si(i)
p2 = ping2.si(i)
p3 = ping3.si(i)
p4 = ping4.si(i)
mychain = chain(p1, p2, p3, p4)
leaf_id = mychain.apply_async()
leafs.append(leaf_id)
print('[INFO] Total leafs ->', leafs)


def _cancel_tasks(msg):
print("[ERROR] Dummy Task canceller->", msg)

def parent_succeeds(t):
if t.parent == None:
return True
else:
parent_succeeded = True
parent = t.parent
if parent.state == 'PENDING':
parent_succeeded = parent_succeeds(parent)
if not parent_succeeded:
return False
print('[INFO] Waiting on parentTask({0})...at {1} - {2}'.format(parent, datetime.now().strftime("%H:%M:%S"), parent.state), end='')
parent.wait(propagate=True)
print('Done.')
return parent.state != 'FAILURE'

def wait_for_comp():
print("[INFO] Waiting for celery to finish...")
max_fail = round(len(leafs) / 2)
fail_count = 0
for t in leafs:
if fail_count <= max_fail:
print('[INFO] Waiting on Task({0})...at {1}'.format(t, datetime.now().strftime("%H:%M:%S")))
try:
if parent_succeeds(t):
t.wait(propagate=True)
else:
print('[ERROR] One of the parent failed -> ', t.parent)
except Exception as e:
fail_count += 1
print('[ERROR] Exception Occurred [' + datetime.now().strftime("%H:%M:%S") + '] ->', str(e), fail_count)
print('[ERROR] Traceback [' + datetime.now().strftime("%H:%M:%S") + '] ->', traceback.format_exc())
else:
print("[ERROR] Failed!")
_cancel_tasks('failure of more than half tasks({0}/{1})'.format(fail_count, max_fail))
break
print("[INFO] Done.")

if __name__ == '__main__':
time_start = time.time()
chain_tasks()
wait_for_comp()
print('Finish time %s', time.time() - time_start)

这种方法的一个问题是它等待一系列任务(for 循环),这些任务不一定需要在工作端维护,因为工作器执行是基于rabbit-mq 条目的。所以这需要大量的等待。

是否有其他方法可以提高等待效率?

最佳答案

如果我没有遗漏一些重要的内容,那么简单的解决方案是将 leaf_id = mychain.apply_async() 更改为如下内容:

result_as = mychain.apply_async()
result = result_as.get() # will block until the task is done

注:do not call get() in your tasks .

关于python - 如何等待 celery 生产者 celery 工作人员完成其任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58247559/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com