gpt4 book ai didi

celery - 运行一组链

转载 作者:行者123 更新时间:2023-12-04 12:46:44 29 4
gpt4 key购买 nike

我正在尝试在 Celery 中运行一组链。我创建了一组链:

chains = [celery.chain(a.s(i), b.s(), c.s()) for i in items]

返回将其包装在一个组中:
group = celery.group(*chains)

这里的期望是 Celery 将安排每个完整链作为独立任务运行。事实上,从逻辑上讲,这就是发生的事情。但是有两个问题:
  • 如果链的数量很大,则似乎什么也没有运行。 Celery 或 rabbitmq 控制台中没有错误。 (是的,使用rabbitmq。)
  • Celery 似乎在移动到每个链的第二个任务之前执行组中所有任务的每个链的第一个任务。 (也就是说,它似乎将链解包成一组任务 a s、任务 b s,然后是任务 c s。它们仍然链接到相应的链条目,但这会引入延迟某些任务 a 完成得比其他任务快得多。

  • 任何想法发生了什么?

    最佳答案

    一个非常有趣的问题!

    I've written code to test your case with memory backend and just one process (it's in the bottom). celery -A module-name --loglevel=info -c 10


  • 类障碍行为:这似乎不是问题。如果您应用不同的 sleep ,或执行大量具有高并行度的任务,您会看到 bc任务与 a 并行执行
  • 大链失败:当我尝试创建 1000000 个链时,代码实际上在链创建时默默地失败了,所以它看起来更像是 python 内存问题。 100000 长度的椅子很好


  • 编码
    from celery import Celery, chain, group
    from pprint import pprint
    import threading
    from time import sleep

    app = Celery('chaintext')
    app.conf.update(
    BROKER_BACKEND = 'memory',
    CELERY_RESULT_BACKEND = 'cache',
    CELERY_CACHE_BACKEND = 'memory',
    CELERY_EAGER_PROPAGATES_EXCEPTIONS = True,
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'], # Ignore other content
    CELERY_ENABLE_UTC=True,
    CELERYD_POOL = 'celery.concurrency.threads:TaskPool'
    )

    @app.task
    def a(i):
    result = 'A %s' % i
    sleep((i%3)/ 10.0)
    pprint(result)
    return result


    @app.task
    def b(self,i):
    result = 'B %s' % i
    sleep((i%3)/ 10.0)
    pprint(result)
    return result

    @app.task
    def c(self,i):
    result = 'C %s' % i
    sleep((i%3)/ 10.0)
    pprint(result)
    return result

    def main():
    print "MAIN"
    import time
    time.sleep(5)
    print "STARTING"
    chains = [chain(a.s(i), b.s(i), c.s(i)) for i in range(1000000)]
    print "CREATED CHAINS"
    g = group(*chains)
    print "CREATED GROUP"
    result = g.apply_async()
    print "QUEUED GROUP"

    print result.get()

    t1 = threading.Thread(target=main)
    t1.start()

    关于celery - 运行一组链,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37076889/

    29 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com