gpt4 book ai didi

python - 在 BranchPython Operator 之后跳过 Airflow 2.0 任务

转载 作者:行者123 更新时间:2023-12-05 01:53:19 25 4
gpt4 key购买 nike

我在新版本中摆弄 Airflow 中的分支,无论我尝试什么,都会跳过 BranchOperator 之后的所有任务。

这是我一直在努力完成的一个最小例子

from airflow.decorators import dag, task
from datetime import timedelta, datetime

from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule

import logging
logger = logging.getLogger("airflow.task")

@dag(
schedule_interval="0 0 * * *",
start_date=datetime.today() - timedelta(days=2),
dagrun_timeout=timedelta(minutes=60),
)
def StackOverflowExample():

@task
def task_A():

logging.info("TASK A")


@task
def task_B():

logging.info("TASK B")

@task
def task_C():

logging.info("TASK C")

@task
def task_D():

logging.info("TASK D")

return {"parameter":0.5}


def _choose_task(task_parameters,**kwargs):

logging.info(task_parameters["parameter"])
if task_parameters["parameter"]<0.5:
logging.info("SUCCESSS ")
return ['branch_1', 'task_final']
else:
logging.info("RIP")
return ['branch_2', 'task_final']

@task(task_id="branch_1")
def branch_1():
logging.info("branch_1...")

@task(task_id="branch_2")
def branch_2():
logging.info("branch_2")

@task(task_id="task_final")
def task_final():
logging.info("task_final")


parameter = task_A() >> task_B() >> task_C() >> task_D()

choose_task = BranchPythonOperator(
task_id='choose_best_model',
op_kwargs={"task_parameters":parameter},
python_callable=_choose_task,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
)



choose_task >> [branch_1(), branch_2()] >> task_final()


dag = StackOverflowExample ()

Airflow DAG graph

有什么线索吗?我怀疑触发规则。我是 Airflow 的初学者,所以我不会放弃我忽略的任何其他问题

最佳答案

您应该在 task_final 上设置触发规则。您希望 task_finalbranch_1branch_2 完成执行后执行(无论执行/跳过其中一个),因此您需要设置全部完成触发规则:

@task(task_id="task_final", trigger_rule=TriggerRule.ALL_DONE)
def task_final():
logging.info("task_final")

enter image description here

关于python - 在 BranchPython Operator 之后跳过 Airflow 2.0 任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71111635/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com