- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
在我的 DAG 中,我有一些任务只能在周六运行。因此,我使用 BranchPythonOperator 在星期六的任务和 DummyTask 之间进行分支。之后,我加入了两个分支并想运行其他任务。
工作流程如下所示:
在这里,我将 dummy3 的触发规则设置为 'one_success'
并且一切正常。
我遇到的问题是当 BranchPythonOperator 上游的某些东西失败时:
BranchPythonOperator 和分支正确地具有状态'upstream_failed'
,但加入分支的任务变为'skipped'
,因此整个工作流显示'success'
.
我尝试使用 'all_success'
作为触发规则,然后如果某事失败整个工作流程失败,它会正常工作,但如果没有失败,dummy3 将被跳过。
我还尝试将 'all_done'
作为触发规则,如果没有失败,它会正常工作,但如果有失败,dummy3 仍然会被执行。
我的测试代码是这样的:
from datetime import datetime, date
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator
from airflow.operators.dummy_operator import DummyOperator
dag = DAG('test_branches',
description='Test branches',
catchup=False,
schedule_interval='0 0 * * *',
start_date=datetime(2018, 8, 1))
def python1():
raise Exception('Test failure')
# print 'Test success'
dummy1 = PythonOperator(
task_id='python1',
python_callable=python1,
dag=dag
)
dummy2 = DummyOperator(
task_id='dummy2',
dag=dag
)
dummy3 = DummyOperator(
task_id='dummy3',
dag=dag,
trigger_rule='one_success'
)
def is_saturday():
if date.today().weekday() == 6:
return 'dummy2'
else:
return 'today_is_not_saturday'
branch_on_saturday = BranchPythonOperator(
task_id='branch_on_saturday',
python_callable=is_saturday,
dag=dag)
not_saturday = DummyOperator(
task_id='today_is_not_saturday',
dag=dag
)
dummy1 >> branch_on_saturday >> dummy2 >> dummy3
branch_on_saturday >> not_saturday >> dummy3
我刚刚想出了一个丑陋的解决方法:
dummy4 代表我实际需要运行的任务,dummy5 只是一个虚拟对象。
dummy3 仍然有触发规则 'one_success'
。
现在,如果没有上游故障,dummy3 和 dummy4 就会运行,如果当天不是星期六,dummy5 就会“运行”,如果那天是星期六,则会被跳过,这意味着 DAG 在这两种情况下都被标记为成功。
如果上游出现故障,则跳过 dummy3 和 dummy4,并将 dummy5 标记为 'upstream_failed'
,并将 DAG 标记为失败。
此变通办法使我的 DAG 按我希望的方式运行,但我仍然更喜欢没有一些 hacky 变通办法的解决方案。
最佳答案
您可以使用的一种解决方法是将 DAG 的第二部分放在 SubDAG 中,就像我在以下说明示例的代码中所做的那样:https://gist.github.com/cosenal/cbd38b13450b652291e655138baa1aba
它按预期工作,并且可以说它比您的解决方法更干净,因为您没有任何额外的辅助虚拟运算符。但是,您失去了平面结构,现在您必须放大 SubDag 才能看到内部结构的细节。
更一般的观察:在对您的 DAG 进行试验后,我得出结论,Airflow 需要类似 JoinOperator 的东西来替换您的 Dummy3 运算符。让我解释。您描述的行为来自这样一个事实,即 DAG 的成功仅基于最后一个运算符的成功(或跳过!)。
以下以“成功”状态结尾的 DAG 是支持上述声明的 MWE。
def python1():
raise Exception('Test failure')
dummy1 = PythonOperator(
task_id='python1',
python_callable=python1,
dag=dag
)
dummy2 = DummyOperator(
task_id='dummy2',
dag=dag,
trigger_rule='one_success'
)
dummy1 >> dummy2
只有当 直接 父级之一成功并且所有其他父级都被跳过时才触发的 JoinOperator 会很酷,而不必使用 trigger_rule
参数.
或者,可以解决您遇到的问题的方法是触发规则 all (success | skipped)
,您可以将其应用于 Dummy3。遗憾的是,我认为您还不能在 Airflow 上创建自定义触发规则。
编辑:在这个答案的第一个版本中,我声称触发规则 one_success
和 all_success
根据成功的程度触发 所有 DAG 中运算符的祖先,而不仅仅是直接父代。这与 documentation 不匹配事实上,它在以下实验中无效:https://gist.github.com/cosenal/b607825539aa0d308f10f3095e084fac
关于python - BranchPythonOperator 后的 Airflow 任务不会失败并正确成功,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51664755/
在Airflow中,我一直在使用“airflow run”和“airflow test”,但不完全理解它们有何不同。他们有什么区别? 最佳答案 我自己通读了文档,发现它是多么令人困惑。 Airflow
我使用 Airflow 已经有一段时间了,它是由一位同事创建的。最近我遇到了一些错误,这需要我更深入地了解如何修复 Airflow 中的某些问题。 我确实理解这三个进程是什么,但我只是不明白运行它们时
AIRFLOW_HOME=/path/to/my/airflow_home 我收到这个警告... >airflow trigger_dag python_dag3 /Users/alexryan/mi
有没有人报告过他们在他们的公司中让 Airflow 扩展了多少?我正在考虑实现 Airflow 来执行 5,000 多个任务,每个任务每小时运行一次,有一天可以将其扩展到 20,000 多个任务。在检
问题 :我想使用 Github 上最新版本的 Apache-Airflow 安装 apache-airflow 以及所有依赖项? 我怎样才能使用 pip 做到这一点? 在生产环境中使用它是否安全? 最
我们在 AWS ECS 上运行 Airflow,并将所有 DAG 捆绑在一个 Docker 镜像中。我们不时更新 DAGS,并部署新版本的 Docker Image。当我们这样做时,ECS 将终止正在
问题很简单。我需要限制 Airflow 网络用户仅查看和执行某些 DAG 和任务。 如果可能,我宁愿不使用 Kerberos也不是 OAuth . Multi-tenancy option 似乎是一个
我们正在使用 Airflow 2.00。我正在尝试实现一个做两件事的 DAG: 通过 API 触发报告 从源到目标下载报告。 任务 1 和任务 2 之间至少需要 2-3 小时的间隔。根据我的研究,我有
对于一项任务,有许多辅助任务 - 从文件/数据库中获取/保存属性、验证、审计。这些辅助方法并不耗时。 一个示例 DAG 流, fetch_data >> actual_processing >> va
有什么方法可以重新加载作业而不必重新启动服务器吗? 最佳答案 在airflow.cfg中,您具有以下两种配置来控制此行为: # after how much time a new DAGs shoul
我们可以通过将任务/dag 超时设置为 None 并手动触发其运行来使用 Airflow dag 来定义永无止境的作业(即具有无条件循环以消耗流数据的任务)吗?让 Airflow 监测永无止境的任务会
我是 Airflow 的新手,最近开始探索这个工具。我在 18.4 版本的 ubuntu 机器上安装了 1.10.10 版。从设置和安装的角度来看,一切正常,但是我在任何 DAG 中的任务都没有运行,
我主要看到Airflow被用于ETL / Bid数据相关的工作。我正在尝试将其用于业务工作流,其中用户操作将来会触发一组相关任务。其中某些任务可能需要根据某些其他用户操作来清除(删除)。 我认为最好的
我有一个 DAG,只要 FileSensor 检测到文件,它就会使用它,为每个文件生成任务,以 (1) 将文件移动到暂存区域,(2) 触发单独的 DAG 来处理文件。 FileSensor -> Mo
我需要手动或以编程方式执行的管道,可以使用 Airflow 吗?看起来现在每个工作流程都必须与时间表绑定(bind)。 最佳答案 只需在创建 DAG 时将 schedule_interval 设置为
所以这是一个愚蠢的想法...... 我在 Airflow 中创建了(许多)DAG...并且它有效...但是,我想以某种方式将其打包,以便我可以在不安装 Airflow 的情况下运行单个 DAG 运行;
我使用“pip install 'apache-airflow[statsd]' 安装了 airflow[statsd] 并安装了 statsd_exporter。现在我可以看到来自 Promethe
我们正在尝试将 MongoHook 和 GCSToLocalFilesystemOperator 导入到我们的 Airflow 项目中: docs for MongoHook docs for GCS
启动 Airflow 网络服务器时出现以下错误 balajee@Balajees-MacBook-Air.local:~$ Airflow 网络服务器 -p 8080 [2018-12-03 00:2
运行pip install airflow[postgres]命令后出现以下错误: > raise RuntimeError("By default one of Airflow's dependen
我是一名优秀的程序员,十分优秀!