作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
special_task --> end 中间的任务可以成功也可以失败,但是end 必须始终 被执行(想象一下这是一个干净地关闭资源的任务)。为此-6ren">
我有以下 3 个任务的 DAG:
start --> special_task --> end
中间的任务可以成功也可以失败,但是
end
必须始终 被执行(想象一下这是一个干净地关闭资源的任务)。为此,我使用了
trigger rule
ALL_DONE
:
end.trigger_rule = trigger_rule.TriggerRule.ALL_DONE
使用它,
end
如果
special_task
被正确执行失败。但是,由于
end
是最后一个任务并成功,DAG 始终标记为
SUCCESS
.
FAILED
?
import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils import trigger_rule
dag = DAG(
dag_id='my_dag',
start_date=datetime.datetime.today(),
schedule_interval=None
)
start = BashOperator(
task_id='start',
bash_command='echo start',
dag=dag
)
special_task = BashOperator(
task_id='special_task',
bash_command='exit 1', # force failure
dag=dag
)
end = BashOperator(
task_id='end',
bash_command='echo end',
dag=dag
)
end.trigger_rule = trigger_rule.TriggerRule.ALL_DONE
start.set_downstream(special_task)
special_task.set_downstream(end)
This post似乎是相关的,但答案不适合我的需要,因为下游任务
end
必须执行(因此必须执行
trigger_rule
)。
最佳答案
我认为这是一个有趣的问题,并花了一些时间弄清楚如何在没有额外虚拟任务的情况下实现它。它变得有点多余,但这是最终结果:
这是完整的 DAG:
import airflow
from airflow import AirflowException
from airflow.models import DAG, TaskInstance, BaseOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.db import provide_session
from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule
default_args = {"owner": "airflow", "start_date": airflow.utils.dates.days_ago(3)}
dag = DAG(
dag_id="finally_task_set_end_state",
default_args=default_args,
schedule_interval="0 0 * * *",
description="Answer for question https://stackoverflow.com/questions/51728441",
)
start = BashOperator(task_id="start", bash_command="echo start", dag=dag)
failing_task = BashOperator(task_id="failing_task", bash_command="exit 1", dag=dag)
@provide_session
def _finally(task, execution_date, dag, session=None, **_):
upstream_task_instances = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.execution_date == execution_date,
TaskInstance.task_id.in_(task.upstream_task_ids),
)
.all()
)
upstream_states = [ti.state for ti in upstream_task_instances]
fail_this_task = State.FAILED in upstream_states
print("Do logic here...")
if fail_this_task:
raise AirflowException("Failing task because one or more upstream tasks failed.")
finally_ = PythonOperator(
task_id="finally",
python_callable=_finally,
trigger_rule=TriggerRule.ALL_DONE,
provide_context=True,
dag=dag,
)
succesful_task = DummyOperator(task_id="succesful_task", dag=dag)
start >> [failing_task, succesful_task] >> finally_
_finally
函数,由 PythonOperator 调用。这里有几个关键点:
@provide_session
注释并添加参数 session=None
,因此您可以使用 session
查询 Airflow DB . upstream_task_instances = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.execution_date == execution_date,
TaskInstance.task_id.in_(task.upstream_task_ids),
)
.all()
)
State.FAILED
在那里:upstream_states = [ti.state for ti in upstream_task_instances]
fail_this_task = State.FAILED in upstream_states
print("Do logic here...")
fail_this_task=True
则任务失败:if fail_this_task:
raise AirflowException("Failing task because one or more upstream tasks failed.")
关于python-2.7 - Airflow : DAG marked as "success" if one task fails, 由于触发规则 ALL_DONE,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51728441/
我正在处理的工作流中的一项要求是在给定时间内等待某个事件发生,如果没有发生,则将任务标记为失败,但仍应执行下游任务。 我想知道“all_done”是否意味着所有依赖任务都完成了,无论它们是否成功。 最
我有以下 3 个任务的 DAG: start --> special_task --> end 中间的任务可以成功也可以失败,但是end 必须始终 被执行(想象一下这是一个干净地关闭资源的任务)。为此
我是一名优秀的程序员,十分优秀!