gpt4 book ai didi

python - 如何使用分支运算符在 Airflow DAG 中分支多条路径?

转载 作者:行者123 更新时间:2023-12-05 02:00:20 29 4
gpt4 key购买 nike

这就是我想要的,但我不知道如何在 Airflow 中实现这一点,因为这两个任务都在执行中。

enter image description here

总结:

  • T1 执行
  • T2 执行
  • 根据 T2 的输出,我想要么 option_1 -> completeoption_2 -> Do_x, Do_y -> complete/li>

我应该如何构建它?我有这个作为我当前的代码:

(t1 >> t2 >> option_1 >> complete)
(t1 >> t2 >> option_2 >> do_x >> do_y >> complete)

在这种情况下,t2 是分支运算符。

我也尝试过 ... [option_1, option_2] ... 的语法,但我需要一个完全独立的路径来执行,而不仅仅是要切换的单个任务。

最佳答案

您的代码中的依赖关系对于分支是正确的。确保 BranchPythonOperator 根据您需要的任何逻辑在分支开始时返回任务的 task_id。有关 BranchPythonOperator 的更多信息 here .最后一个重要说明与“完成”任务有关。由于分支收敛于“完成”任务,因此确保 trigger_rule 设置为“none_failed”(您也可以使用 TriggerRule 类常量)以便任务不会'被跳过。

快速代码测试供您引用:

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule

from datetime import datetime


DEFAULT_ARGS = dict(
start_date=datetime(2021, 5, 5),
owner="airflow",
retries=0,
)

DAG_ARGS = dict(
dag_id="multi_branch",
schedule_interval=None,
default_args=DEFAULT_ARGS,
catchup=False,
)


def random_branch():
from random import randint

return "option_1" if randint(1, 2) == 1 else "option_2"


with DAG(**DAG_ARGS) as dag:
t1 = DummyOperator(task_id="t1")

t2 = BranchPythonOperator(task_id="t2", python_callable=random_branch)

option_1 = DummyOperator(task_id="option_1")

option_2 = DummyOperator(task_id="option_2")

do_x = DummyOperator(task_id="do_x")

do_y = DummyOperator(task_id="do_y")

complete = DummyOperator(task_id="complete", trigger_rule=TriggerRule.NONE_FAILED)

t1 >> t2 >> option_1 >> complete
t1 >> t2 >> option_2 >> do_x >> do_y >> complete

DAG with BranchPythonOperator

关于python - 如何使用分支运算符在 Airflow DAG 中分支多条路径?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67427144/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com