- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个名为final
的任务,该任务具有多个上游连接。当上游上游之一被ShortCircuitOperator
跳过时,此任务也将被跳过。我不想跳过final
任务,因为它必须报告DAG成功。
为了避免被跳过,我使用了trigger_rule='all_done'
,但是仍然被跳过。
如果我使用BranchPythonOperator
而不是ShortCircuitOperator
,就不会跳过final
任务。即使不是最佳选择,分支工作流似乎也是一个解决方案,但是现在final
将不再考虑上游任务的失败。
如何使它仅在成功运行或跳过上游时运行?
短路DAG示例:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
from datetime import datetime
from random import randint
default_args = {
'owner': 'airflow',
'start_date': datetime(2018, 8, 1)}
dag = DAG(
'shortcircuit_test',
default_args=default_args,
schedule_interval='* * * * *',
catchup=False)
def shortcircuit_fn():
return randint(0, 1) == 1
task_1 = DummyOperator(dag=dag, task_id='task_1')
task_2 = DummyOperator(dag=dag, task_id='task_2')
work = DummyOperator(dag=dag, task_id='work')
short = ShortCircuitOperator(dag=dag, task_id='short_circuit', python_callable=shortcircuit_fn)
final = DummyOperator(dag=dag, task_id="final", trigger_rule="all_done")
task_1 >> short >> work >> final
task_1 >> task_2 >> final
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from datetime import datetime
from random import randint
default_args = {
'owner': 'airflow',
'start_date': datetime(2018, 8, 1)}
dag = DAG(
'branch_test',
default_args=default_args,
schedule_interval='* * * * *',
catchup=False)
# these two are only here to protect tasks from getting skipped as direct dependencies of branch operator
to_do_work = DummyOperator(dag=dag, task_id='to_do_work')
to_skip_work = DummyOperator(dag=dag, task_id='to_skip_work')
def branch_fn():
return to_do_work.task_id if randint(0, 1) == 1 else to_skip_work.task_id
task_1 = DummyOperator(dag=dag, task_id='task_1')
task_2 = DummyOperator(dag=dag, task_id='task_2')
work = DummyOperator(dag=dag, task_id='work')
branch = BranchPythonOperator(dag=dag, task_id='branch', python_callable=branch_fn)
final = DummyOperator(dag=dag, task_id="final", trigger_rule="all_done")
task_1 >> branch >> to_do_work >> work >> final
branch >> to_skip_work >> final
task_1 >> task_2 >> final
最佳答案
我最终基于原始的开发了定制的ShortCircuitOperator:
class ShortCircuitOperator(PythonOperator, SkipMixin):
"""
Allows a workflow to continue only if a condition is met. Otherwise, the
workflow "short-circuits" and downstream tasks that only rely on this operator
are skipped.
The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
condition and short-circuits the workflow if the condition is False. Any
downstream tasks that only rely on this operator are marked with a state of "skipped".
If the condition is True, downstream tasks proceed as normal.
The condition is determined by the result of `python_callable`.
"""
def find_tasks_to_skip(self, task, found_tasks=None):
if not found_tasks:
found_tasks = []
direct_relatives = task.get_direct_relatives(upstream=False)
for t in direct_relatives:
if len(t.upstream_task_ids) == 1:
found_tasks.append(t)
self.find_tasks_to_skip(t, found_tasks)
return found_tasks
def execute(self, context):
condition = super(ShortCircuitOperator, self).execute(context)
self.log.info("Condition result is %s", condition)
if condition:
self.log.info('Proceeding with downstream tasks...')
return
self.log.info(
'Skipping downstream tasks that only rely on this path...')
tasks_to_skip = self.find_tasks_to_skip(context['task'])
self.log.debug("Tasks to skip: %s", tasks_to_skip)
if tasks_to_skip:
self.skip(context['dag_run'], context['ti'].execution_date,
tasks_to_skip)
self.log.info("Done.")
关于 Airflow : Run a task when some upstream is skipped by shortcircuit,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51725746/
我正在做一个项目,我需要一个 skip-gram 模型的预训练向量。我听说还有一个名为 skip-n-gram 模型的变体可以提供更好的结果。 我想知道自己训练模型需要什么?因为我只需要它们来为我的模
我正在尝试实现 EF 提供的分页功能。我以为我只需要将简单的 Skip() 和 Take() 添加到我的查询中,但后来我收到了这条消息: Error 4 'System.Linq.IOrderedQu
到目前为止,我有一个语法一直在使用标准的 boost::spirit::ascii::space/boost::spirit::ascii::space_type 船长。 我有一些使用船长的规则和一些
我正在处理动态数据。 创建动态模型并在global.asax中注册后,如 DefaultModel.RegisterContext(typeof(masterEntities1),new Contex
在 Visual Studio 2013 Update 1 中使用 Entity Framework 6.0.2 和 .NET 4.5.1 DbContext连接到 SQL Server: 我有一个很
我正在创建一个支持动态数据的 ASP.NET 网站。当我从头开始(从 VS 中的模板)创建动态网站时,一切正常。但是,当我尝试添加动态实体 (.edmx) 文件并运行应用程序时,出现以下错误: “Sk
我们可以跳过一个可选参数并为跳过的参数赋值吗? 例如我有一个函数: public function Dialog(message:String,title:String="Note",dialogsi
我创建了一个函数来从我的 firestore 集合中提取一个随机文档,然后返回一个 FIRQuery。得到它后,我创建了一个模型文件来解析我的 firestore 文档中的任何 FIRQuery 数据
我对编程比较陌生,我想尝试看看如何在 Visual Studio 2019 中制作基本的 Windows 桌面应用程序。由于我为按钮添加了代码,所以 C2360 和 C2361 错误不断弹出,但我没有
我正在使用 Data Skipping Indexes clickhouse 中的功能,我对它的用法感到困惑。如果我在创建表时添加数据跳过索引,如下所示: CREATE TABLE MyTable (
我不确定Maven指令-Dmaven.test.skip.exec和-Dmaven.test.skip=true -DskipTests之间的区别是什么。两者似乎都抑制了测试周期。 最佳答案 “mav
假设我有一些神奇的分页黑盒类,它使用 pageIndex 和 pageSize 检索数据,如下所示: public class PaginatedList { // ... // Ch
我计划使用 Bowerjs 来管理前端依赖项,并使用 gulp.js 来连接、缩小和编译前端代码。因此,我想跳过 Rails Assets 管道,包括 sprocket。我运行以下命令来跳过 Asse
我计划使用 Bowerjs 来管理前端依赖项,并使用 gulp.js 来连接、缩小和编译前端代码。因此,我想跳过 Rails Assets 管道,包括 sprocket。我运行以下命令来跳过 Asse
我正在索引页上的 MVC 中进行分页......在这一行我得到了错误 return View( employee.ToPagedList(Page ?? 1,3)); 这里是索引方法 public
我正在搜索关于跳过连接的科学工作。 每个人都在谈论通过网络改进梯度流,这感觉很有意义。但是我很想理解和阅读归根结底的数学和代码意味着什么以及如何在 tensorflow 中深入实现它。 resnet
添加 skip规则不符合我的期望。这是由逗号和空格分隔的一对标记的语法。我制作了一个版本,其中逗号标记为 skip ,还有一个不是: grammar Commas; COMMA: '
将Jupyter用于幻灯片时,是否有默认方法将幻灯片类型设置为跳过新单元格的? 最佳答案 我针对您所要求的解决方法。 创建一个空的“跳过”幻灯片并键入: 退出 C 伏特 伏特 伏特 复制并粘贴许多空的
我正在尝试使用更新 svn update --username myusername https://my.svn.address 但是,我只是收到一条“已跳过”消息? 最佳答案 我猜你遇到了这种类型
Flash 的 MovieClip 时间轴的创建方式可以跳过帧以保持动画平滑度和音频同步。 我的问题是,如果帧上有 ActionScript,是否有可能跳过该帧,从而不调用脚本? 或者有脚本的帧永远不
我是一名优秀的程序员,十分优秀!