gpt4 book ai didi

airflow - 如果任务 1 失败,如何在运行时添加任务

转载 作者:行者123 更新时间:2023-12-01 00:19:35 24 4
gpt4 key购买 nike

如果任务 1 成功,我想执行任务 2,如果任务 1 失败,我想运行任务 3,并希望在需要时分配另一个流程。

基本上我想在没有 ssh 运算符(operator)的情况下在 Airflow 中运行条件任务。

from airflow import DAG
from airflow.operators import PythonOperator,BranchPythonOperator
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from airflow.models import Variable


def t2_error_task(context):
instance = context['task_instance']
if instance.task_id == "performExtract":
print ("Please implement something over this")
task_3 = PythonOperator(
task_id='performJoin1',
python_callable=performJoin1, # maybe main?
dag = dag
)
dag.add_task(task_3)
with DAG(
'manageWorkFlow',
catchup=False,
default_args={
'owner': 'Mannu',
'start_date': datetime(2018, 4, 13),
'schedule_interval':None,
'depends_on_past': False,
},
) as dag:
task_1 = PythonOperator(
task_id='performExtract',
python_callable=performExtract,
on_failure_callback=t2_error_task,
depends_on_past=True
)
task_2 = PythonOperator(
task_id='printSchemas',
depends_on_past=True,
python_callable=printSchemaAll, # maybe main?
)
task_2.set_upstream(task_1)

最佳答案

Airflow 不支持根据执行时状态动态添加任务。为了获得所需的行为,您应该添加 task_3到你的 dag 但改变它的 trigger_rule all_failed .在这种情况下,当 task_1 时,任务将被标记为已跳过。成功,但它会在失败时被执行。

关于airflow - 如果任务 1 失败,如何在运行时添加任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49893603/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com