- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
special_task --> end 中间的任务可以成功也可以失败,但是end 必须始终 被执行(想象一下这是一个干净地关闭资源的任务)。为此-6ren">
我有以下 3 个任务的 DAG:
start --> special_task --> end
中间的任务可以成功也可以失败,但是
end
必须始终 被执行(想象一下这是一个干净地关闭资源的任务)。为此,我使用了
trigger rule
ALL_DONE
:
end.trigger_rule = trigger_rule.TriggerRule.ALL_DONE
使用它,
end
如果
special_task
被正确执行失败。但是,由于
end
是最后一个任务并成功,DAG 始终标记为
SUCCESS
.
FAILED
?
import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils import trigger_rule
dag = DAG(
dag_id='my_dag',
start_date=datetime.datetime.today(),
schedule_interval=None
)
start = BashOperator(
task_id='start',
bash_command='echo start',
dag=dag
)
special_task = BashOperator(
task_id='special_task',
bash_command='exit 1', # force failure
dag=dag
)
end = BashOperator(
task_id='end',
bash_command='echo end',
dag=dag
)
end.trigger_rule = trigger_rule.TriggerRule.ALL_DONE
start.set_downstream(special_task)
special_task.set_downstream(end)
This post似乎是相关的,但答案不适合我的需要,因为下游任务
end
必须执行(因此必须执行
trigger_rule
)。
最佳答案
我认为这是一个有趣的问题,并花了一些时间弄清楚如何在没有额外虚拟任务的情况下实现它。它变得有点多余,但这是最终结果:
这是完整的 DAG:
import airflow
from airflow import AirflowException
from airflow.models import DAG, TaskInstance, BaseOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.db import provide_session
from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule
default_args = {"owner": "airflow", "start_date": airflow.utils.dates.days_ago(3)}
dag = DAG(
dag_id="finally_task_set_end_state",
default_args=default_args,
schedule_interval="0 0 * * *",
description="Answer for question https://stackoverflow.com/questions/51728441",
)
start = BashOperator(task_id="start", bash_command="echo start", dag=dag)
failing_task = BashOperator(task_id="failing_task", bash_command="exit 1", dag=dag)
@provide_session
def _finally(task, execution_date, dag, session=None, **_):
upstream_task_instances = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.execution_date == execution_date,
TaskInstance.task_id.in_(task.upstream_task_ids),
)
.all()
)
upstream_states = [ti.state for ti in upstream_task_instances]
fail_this_task = State.FAILED in upstream_states
print("Do logic here...")
if fail_this_task:
raise AirflowException("Failing task because one or more upstream tasks failed.")
finally_ = PythonOperator(
task_id="finally",
python_callable=_finally,
trigger_rule=TriggerRule.ALL_DONE,
provide_context=True,
dag=dag,
)
succesful_task = DummyOperator(task_id="succesful_task", dag=dag)
start >> [failing_task, succesful_task] >> finally_
_finally
函数,由 PythonOperator 调用。这里有几个关键点:
@provide_session
注释并添加参数 session=None
,因此您可以使用 session
查询 Airflow DB . upstream_task_instances = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.execution_date == execution_date,
TaskInstance.task_id.in_(task.upstream_task_ids),
)
.all()
)
State.FAILED
在那里:upstream_states = [ti.state for ti in upstream_task_instances]
fail_this_task = State.FAILED in upstream_states
print("Do logic here...")
fail_this_task=True
则任务失败:if fail_this_task:
raise AirflowException("Failing task because one or more upstream tasks failed.")
关于python-2.7 - Airflow : DAG marked as "success" if one task fails, 由于触发规则 ALL_DONE,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51728441/
问题:假设我们有一个名为success 的C++ 字符串。 &success 和 success& 有什么区别? 这题是我期中复习的,我理解&success是获取对象的地址。但我不确定 success
我正在阅读 gearman 代码的手册页( http://manpages.ubuntu.com/manpages/precise/man3/gearman_success.3.html )。他们有两
在 Angular 中,[class]="'success'" 和 class="success" 有区别吗? 例如: ... 或 ... 如果同时使用 IIUC 后者会导致替换前者,所以我想更好
当我查看日志文件时 D:\SAS\XXX\Lev1\SASMain\BatchServer\Logs 我看到了这两行 NOTE: Libref TESTLIB successfully assigne
我正在努力完成这项工作: $http.post('/route/path', {'username': $scope.threadedUsers[currentIndex].name}). s
我正在尝试设置协议(protocol)来处理我的 javascript 函数中的成功和错误,但是当函数成功时它似乎并不合作。我收到以下错误: Failed with: TypeError: Objec
在 typescript 中,我有一个 DataAccess 类,以便所有 Ajax 调用都通过单个对象进行路由,以节省应用程序中许多地方的代码重复。 在使用这种方法时,我需要使用回调将响应返回到调用
我已经用 Spring Security 3.0.2 实现了一个登录-注销系统,一切都很好,但是对于这一点:在我添加了一个带有 invalid-session-url 属性的 session 管理标签
运行命令pip install rpy2会导致以下输出和错误消息:。我按照另一篇Stackoverflow帖子上的说明卸载并重新安装了R和相关目录,但得到了相同的错误。我还尝试安装以前版本的rpy2,
我最近在项目的测试目标中添加了一个新的单元测试,当我点击产品 > 测试时,Xcode 说“测试成功”。我相当确定测试应该失败,因为被测试的方法尚未实现。我在测试中添加了一个断点,但从未到达过;测试仍然
我正在编写一个程序来创建一个AD帐户并启用Exchange邮箱,并且从中得到一些奇怪的行为。 首先,尽管它成功创建了AD用户,但由于“找不到MyPath/先生示例”,因此无法启用邮箱。我认为这是由于A
在cd_deployer_conf文件中我们设置了Cleanup="false,这样传输包在发布后不会被删除,但是我们可以看到单个发布操作有3个不同的包喜欢 tcm_0-264891-66560.CO
我正在使用 $().each() 循环访问一些项目。我想确保这段脚本之后的操作仅在 each() 完成时执行。 示例: $('something').each(function() { // do
我正在尝试创建一个 ping 洪水程序,它将目标 IP 地址和广播 IP 地址作为参数。该程序将向广播地址发送 icmp echo 数据包,并将受害者的 IP 地址作为源。网络上所有收到数据包的主机都
我正在实现一个虚拟 LTE EPC 设置,其模块 (HSS) 之一需要 mysql 数据库。创建后,我必须运行该模块,但收到屏幕截图 1 中所示的错误。附:我对这个东西很陌生 最佳答案 您必须修改您的
#include #include #include #include #include #include #include #include #include void error(cha
我的基本代码如下, fd = open("test.file", O_RDONLY); if (read(fd, &tempch, 1) < 1) { perror("F
更新 :这似乎是在 Pipeline: Declarative 中引入的错误插件版本 1.3.5 - 降级到 1.3.4.1解决了这个问题。票证创建于: https://issues.jenkins-
我最近在 jQuery 网站上看到了弃用通知。 Deprecation Notice: The jqXHR.success(), jqXHR.error(), and jqXHR.complete()
我假设调用 status.success() 会终止后台作业。但是,当我运行它时,我仍然看到第二条日志消息: Parse.Cloud.job("Tester", function (request,
我是一名优秀的程序员,十分优秀!