gpt4 book ai didi

python - Airflow 在单个 DAG 中生成动态任务,任务 N+1 依赖于任务 N

转载 作者:太空宇宙 更新时间:2023-11-03 13:27:33 25 4
gpt4 key购买 nike

当动态生成任务时,我需要让任务 2 依赖于任务 1,任务 1 >> 任务 2 或 task2.set_upstream(task1)。

由于 task_ids 被评估,或者看起来是预先的,我无法提前设置依赖关系,任何帮助将不胜感激。

Component(I) 任务生成良好,只是它们全部同时运行。

for i in range(1,10):
task_id='Component'+str(i)
task_id = BashOperator(
task_id='Component'+str(i),
bash_command="echo {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) ,
xcom_push=True,
dag=dag)
?????.set_upstream(??????)

最佳答案

对于 Airflow >=2.3

您可以使用 Dynamic Task Mapping 原生支持动态任务的功能

BashOperator.partial(task_id="Component", do_xcom_push=True).expand(
bash_command=[
"echo {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i)
for i in range(0, 10)
]
)

对于 Airflow <2.3

使用以下代码:

a = []
for i in range(0,10):
a.append(BashOperator(
task_id='Component'+str(i),
bash_command="echo {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) ,
xcom_push=True,
dag=dag))
if i not in [0]:
a[i-1] >> a[i]

使用 DummyOperator,代码如下所示:

a = []
for i in range(0,10):
a.append(DummyOperator(
task_id='Component'+str(i),
dag=dag))
if i not in [0]:
a[i-1] >> a[i]

这将生成以下 DAG:

enter image description here

关于python - Airflow 在单个 DAG 中生成动态任务,任务 N+1 依赖于任务 N,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52558018/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com