gpt4 book ai didi

python - 编写和运行任务 DAG 的最简洁方法是什么?

转载 作者:太空宇宙 更新时间:2023-11-04 01:48:31 25 4
gpt4 key购买 nike

我想编写并运行一个有向无环图 (DAG),其中包含多个串行或并行运行的任务。理想情况下它看起来像:

def task1():
# ...

def task2():
# ...

graph = Sequence([
task1,
task2,
Parallel([
task3,
task4
]),
task5
]

graph.run()

它将运行 1 -> 2 ->(同时运行 3 和 4)-> 5。任务需要访问全局范围以存储结果、写入日志和访问命令行参数。

我的用例是编写部署脚本。 并行任务是 IO 绑定(bind)的:通常在远程服务器上等待完成一个步骤。

我研究了线程、asyncio、Airflow,但没有找到任何简单的库可以在没有一些样板代码的情况下允许它遍历和控制图形的执行。有没有这样的东西?

最佳答案

这是一个快速的概念验证实现。它可以像这样使用:

graph = sequence(
lambda: print(1),
lambda: print(2),
parallel(
lambda: print(3),
lambda: print(4),
sequence(
lambda: print(5),
lambda: print(6))),
lambda: print(7)

graph()

1
2
3
5
6
4
7

sequence 生成一个包含 for 循环的函数,parallel 生成一个包含线程池使用的函数:

from typing import Callable
from multiprocessing.pool import ThreadPool

Task = Callable[[], None]

_pool: ThreadPool = ThreadPool()

def sequence(*tasks: Task) -> Task:
def run():
for task in tasks:
task()

return run # Returning "run" to be used as a task by other "sequence" and "parallel" calls

def parallel(*tasks: Task) -> Task:
def run():
_pool.map(lambda f: f(), tasks) # Delegate to a pool used for IO tasks

return run

每次调用 sequenceparallel 都会返回一个新的“任务”(一个不带参数且不返回任何内容的函数)。然后,该任务可以由其他外部调用 sequenceparallel 调用。

关于ThreadPool的注意事项:

  • 虽然这确实为 parallel 使用了线程池,但由于 GIL,这仍然一次只能执行一件事。这意味着 parallel 对于 CPU 密集型任务基本上没有用。

  • 我没有指定池应该以多少线程开始。我认为它默认为您可以使用的内核数。如果需要更多,您可以使用 ThreadPool 的第一个参数来指定要开始的数量。

  • 为简洁起见,我不会清理 ThreadPool。如果你使用它,你绝对应该这样做。

  • 尽管 ThreadPoolmultiprocessing 的一部分,但令人困惑的是它使用的是线程而不是进程。

关于python - 编写和运行任务 DAG 的最简洁方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58581270/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com