gpt4 book ai didi

airflow - 如何跳过 Airflow 上的任务?

转载 作者:行者123 更新时间:2023-12-04 18:56:19 24 4
gpt4 key购买 nike

我想了解 Airflow 是否支持跳过 DAG 中的任务以进行临时执行?

假设我的 DAG 图如下所示:
任务 1 > 任务 2 > 任务 3 > 任务 4

我想从 task3 手动启动我的 DAG,这样做的最佳方法是什么?

我读过 ShortCircuitOperator ,但我正在寻找更多的临时解决方案,一旦触发执行就可以应用。

谢谢!

最佳答案

您可以合并 SkipMixin ShortCircuitOperator uses under the hood跳过下游任务。

from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults


class mySkippingOperator(BaseOperator, SkipMixin)

@apply_defaults
def __init__(self,
condition,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.condition = condition

def execute(self, context):

if self.condition:
self.log.info('Proceeding with downstream tasks...')
return

self.log.info('Skipping downstream tasks...')

downstream_tasks = context['task'].get_flat_relatives(upstream=False)

self.log.debug("Downstream task_ids %s", downstream_tasks)

if downstream_tasks:
self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)

self.log.info("Done.")

关于airflow - 如何跳过 Airflow 上的任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52190926/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com