gpt4 book ai didi

python - celery block 内链

转载 作者:太空狗 更新时间:2023-10-29 19:28:41 25 4
gpt4 key购买 nike

我想在 celery 链命令中使用 block 。

chain = task1.s(arg1) | task2.chunks(?,CHUNK_SIZE) | task3.chunks(?, CHUNK_SIZE)

基本上我想做的是运行 task1,将其结果分 block 并将分 block 发送到 task2,然后 task2 应该调用 task3,task3 也应该从 task2 接收分 block 结果以完成该过程。为什么?因为 task1 和 task2 都可以返回相当数量的项目,我想分批处理这些项目。

上面的代码不起作用,因为我不太确定用什么代替问号才能使其起作用。

我不太确定这是否可行,因为搜索没有提供太多结果,所以如果无法构建这样的工作流程,我会对合理的替代方案感兴趣。

最佳答案

我不确定这对于现有的原语是否可行。

我可以考虑是否有两种选择/解决方法:

  1. 使用 block /和弦从任务中启动新任务。

    你一定已经想到了这个。这个想法是用 apply_async 正常调用 task1。一旦该任务完成生成需要分 block 的大量输出,只需使用 chunks为 task2 进一步创建 block 的原语。同样,对task2和task3之间的转换执行相同的步骤。当您最终等待获取内部任务的结果时,从任务内部调用任务只是一个坏主意。所以请记住,如果您正在等待任务结果,那么这不是推荐的方法。

    @task
    def task1(some_input):
    # Do stuff
    # Create a list of lists where the inner list represent the *args to send to an individual task
    task2.chunks([[i, j], [i, j], [i, j]], CHUNK_SIZE).apply_async()

    @task
    def task2(a, b):
    # Do stuff
    # Create a list of lists where the inner list represent the *args to send to an individual task
    task3.chunks([[i, j], [i, j], [i, j]], CHUNK_SIZE).apply_async()

    @task
    def task3(a, b):
    # Do stuff
  2. 这个解决方案有点有趣。我在 celery Github 问题页面上遇到了一个特定的请求。查看来自 steeve 的这个拉取请求:https://github.com/celery/celery/pull/817据我所知,他创建了一个动态任务装饰器(关于名称是否应该是那个存在争论),它了解任务是否返回子任务。如果是这样,它首先应用该子任务。他声称他在 Veezio 的生产中成功地使用了它。我自己还没有尝试过。我建议转到那个线程并问几个问题。或者甚至通过 Twitter 或 IRC 之类的方式窃听 Steeve。

关于python - celery block 内链,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18783619/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com