gpt4 book ai didi

Airflow : Run a task when some upstream is skipped by shortcircuit

转载 作者:行者123 更新时间:2023-12-03 15:59:48 27 4
gpt4 key购买 nike

我有一个名为final的任务,该任务具有多个上游连接。当上游上游之一被ShortCircuitOperator跳过时,此任务也将被跳过。我不想跳过final任务,因为它必须报告DAG成功。

为了避免被跳过,我使用了trigger_rule='all_done',但是仍然被跳过。

如果我使用BranchPythonOperator而不是ShortCircuitOperator,就不会跳过final任务。即使不是最佳选择,分支工作流似乎也是一个解决方案,但是现在final将不再考虑上游任务的失败。

如何使它仅在成功运行或跳过上游时运行?

短路DAG示例:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
from datetime import datetime
from random import randint

default_args = {
'owner': 'airflow',
'start_date': datetime(2018, 8, 1)}

dag = DAG(
'shortcircuit_test',
default_args=default_args,
schedule_interval='* * * * *',
catchup=False)

def shortcircuit_fn():
return randint(0, 1) == 1

task_1 = DummyOperator(dag=dag, task_id='task_1')
task_2 = DummyOperator(dag=dag, task_id='task_2')

work = DummyOperator(dag=dag, task_id='work')
short = ShortCircuitOperator(dag=dag, task_id='short_circuit', python_callable=shortcircuit_fn)
final = DummyOperator(dag=dag, task_id="final", trigger_rule="all_done")

task_1 >> short >> work >> final
task_1 >> task_2 >> final

DAG with shortcircuit operator

样本分支DAG:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from datetime import datetime
from random import randint

default_args = {
'owner': 'airflow',
'start_date': datetime(2018, 8, 1)}

dag = DAG(
'branch_test',
default_args=default_args,
schedule_interval='* * * * *',
catchup=False)

# these two are only here to protect tasks from getting skipped as direct dependencies of branch operator
to_do_work = DummyOperator(dag=dag, task_id='to_do_work')
to_skip_work = DummyOperator(dag=dag, task_id='to_skip_work')

def branch_fn():
return to_do_work.task_id if randint(0, 1) == 1 else to_skip_work.task_id

task_1 = DummyOperator(dag=dag, task_id='task_1')
task_2 = DummyOperator(dag=dag, task_id='task_2')

work = DummyOperator(dag=dag, task_id='work')
branch = BranchPythonOperator(dag=dag, task_id='branch', python_callable=branch_fn)
final = DummyOperator(dag=dag, task_id="final", trigger_rule="all_done")

task_1 >> branch >> to_do_work >> work >> final
branch >> to_skip_work >> final
task_1 >> task_2 >> final

DAG with branch operator

最佳答案

我最终基于原始的开发了定制的ShortCircuitOperator:

class ShortCircuitOperator(PythonOperator, SkipMixin):
"""
Allows a workflow to continue only if a condition is met. Otherwise, the
workflow "short-circuits" and downstream tasks that only rely on this operator
are skipped.

The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
condition and short-circuits the workflow if the condition is False. Any
downstream tasks that only rely on this operator are marked with a state of "skipped".
If the condition is True, downstream tasks proceed as normal.

The condition is determined by the result of `python_callable`.
"""

def find_tasks_to_skip(self, task, found_tasks=None):
if not found_tasks:
found_tasks = []
direct_relatives = task.get_direct_relatives(upstream=False)
for t in direct_relatives:
if len(t.upstream_task_ids) == 1:
found_tasks.append(t)
self.find_tasks_to_skip(t, found_tasks)
return found_tasks

def execute(self, context):
condition = super(ShortCircuitOperator, self).execute(context)
self.log.info("Condition result is %s", condition)

if condition:
self.log.info('Proceeding with downstream tasks...')
return

self.log.info(
'Skipping downstream tasks that only rely on this path...')

tasks_to_skip = self.find_tasks_to_skip(context['task'])
self.log.debug("Tasks to skip: %s", tasks_to_skip)

if tasks_to_skip:
self.skip(context['dag_run'], context['ti'].execution_date,
tasks_to_skip)

self.log.info("Done.")

该运算符(operator)确保不会因一个跳过的任务而跳过依赖多个路径的下游任务。

关于 Airflow : Run a task when some upstream is skipped by shortcircuit,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51725746/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com