gpt4 book ai didi

python-3.x - 如何在 Airflow 中的单个 Dag 上处理不同的任务间隔?

转载 作者:行者123 更新时间:2023-12-03 12:42:51 40 4
gpt4 key购买 nike

我有一个带有多个任务的单个 dag,其结构简单,任务 A、B 和 C 可以在没有任何依赖关系的情况下在开始时运行,但任务 D 依赖于 A 不,这是我的问题:

任务 A、B 和 C 每天运行,但我需要在 A 成功后每周运行任务 D。我该如何设置这个 dag?

更改任务的 schedule_interval 是否有效?这个问题有什么最佳实践吗?

谢谢你的帮助。

最佳答案

您可以使用 ShortCircuitOperator去做这个。

import airflow
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG


args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'schedule_interval': '0 10 * * *'
}

dag = DAG(dag_id='example', default_args=args)

a = DummyOperator(task_id='a', dag=dag)
b = DummyOperator(task_id='b', dag=dag)
c = DummyOperator(task_id='c', dag=dag)
d = DummyOperator(task_id='d', dag=dag)

def check_trigger(execution_date, **kwargs):
return execution_date.weekday() == 0

check_trigger_d = ShortCircuitOperator(
task_id='check_trigger_d',
python_callable=check_trigger,
provide_context=True,
dag=dag
)

a.set_downstream(b)
b.set_downstream(c)
a.set_downstream(check_trigger_d)
# Perform D only if trigger function returns a true value
check_trigger_d.set_downstream(d)

关于python-3.x - 如何在 Airflow 中的单个 Dag 上处理不同的任务间隔?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48440262/

40 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com