special_task --> end 中间的任务可以成功也可以失败,但是end 必须始终 被执行(想象一下这是一个干净地关闭资源的任务)。为此-6ren">
gpt4 book ai didi

python-2.7 - Airflow : DAG marked as "success" if one task fails, 由于触发规则 ALL_DONE

转载 作者:行者123 更新时间:2023-12-03 14:10:44 26 4
gpt4 key购买 nike

我有以下 3 个任务的 DAG:

start --> special_task --> end
中间的任务可以成功也可以失败,但是 end 必须始终 被执行(想象一下这是一个干净地关闭资源的任务)。为此,我使用了 trigger rule ALL_DONE :
end.trigger_rule = trigger_rule.TriggerRule.ALL_DONE
使用它, end如果 special_task 被正确执行失败。但是,由于 end是最后一个任务并成功,DAG 始终标记为 SUCCESS .
如何配置我的 DAG,以便如果其中一项任务失败,则整个 DAG 被标记为 FAILED ?
重现的例子
import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils import trigger_rule

dag = DAG(
dag_id='my_dag',
start_date=datetime.datetime.today(),
schedule_interval=None
)

start = BashOperator(
task_id='start',
bash_command='echo start',
dag=dag
)

special_task = BashOperator(
task_id='special_task',
bash_command='exit 1', # force failure
dag=dag
)

end = BashOperator(
task_id='end',
bash_command='echo end',
dag=dag
)
end.trigger_rule = trigger_rule.TriggerRule.ALL_DONE

start.set_downstream(special_task)
special_task.set_downstream(end)
This post似乎是相关的,但答案不适合我的需要,因为下游任务 end必须执行(因此必须执行 trigger_rule )。

最佳答案

我认为这是一个有趣的问题,并花了一些时间弄清楚如何在没有额外虚拟任务的情况下实现它。它变得有点多余,但这是最终结果:

这是完整的 DAG:

import airflow
from airflow import AirflowException
from airflow.models import DAG, TaskInstance, BaseOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.db import provide_session
from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule

default_args = {"owner": "airflow", "start_date": airflow.utils.dates.days_ago(3)}

dag = DAG(
dag_id="finally_task_set_end_state",
default_args=default_args,
schedule_interval="0 0 * * *",
description="Answer for question https://stackoverflow.com/questions/51728441",
)

start = BashOperator(task_id="start", bash_command="echo start", dag=dag)
failing_task = BashOperator(task_id="failing_task", bash_command="exit 1", dag=dag)


@provide_session
def _finally(task, execution_date, dag, session=None, **_):
upstream_task_instances = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.execution_date == execution_date,
TaskInstance.task_id.in_(task.upstream_task_ids),
)
.all()
)
upstream_states = [ti.state for ti in upstream_task_instances]
fail_this_task = State.FAILED in upstream_states

print("Do logic here...")

if fail_this_task:
raise AirflowException("Failing task because one or more upstream tasks failed.")


finally_ = PythonOperator(
task_id="finally",
python_callable=_finally,
trigger_rule=TriggerRule.ALL_DONE,
provide_context=True,
dag=dag,
)

succesful_task = DummyOperator(task_id="succesful_task", dag=dag)

start >> [failing_task, succesful_task] >> finally_

_finally函数,由 PythonOperator 调用。这里有几个关键点:
  • @provide_session 注释并添加参数 session=None ,因此您可以使用 session 查询 Airflow DB .
  • 查询当前任务的所有上游任务实例:

  • upstream_task_instances = (
    session.query(TaskInstance)
    .filter(
    TaskInstance.dag_id == dag.dag_id,
    TaskInstance.execution_date == execution_date,
    TaskInstance.task_id.in_(task.upstream_task_ids),
    )
    .all()
    )
  • 从返回的任务实例中,获取状态并检查是否 State.FAILED在那里:

  • upstream_states = [ti.state for ti in upstream_task_instances]
    fail_this_task = State.FAILED in upstream_states
  • 执行您自己的逻辑:

  • print("Do logic here...")
  • 最后,如果 fail_this_task=True 则任务失败:

  • if fail_this_task:
    raise AirflowException("Failing task because one or more upstream tasks failed.")

    最终结果:

    enter image description here

    关于python-2.7 - Airflow : DAG marked as "success" if one task fails, 由于触发规则 ALL_DONE,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51728441/

    26 4 0