gpt4 book ai didi

Airflow - 在同一个 DAG 中使用 TaskGroup 和 PythonBranchOperator

转载 作者:行者123 更新时间:2023-12-04 14:57:07 25 4
gpt4 key购买 nike

我目前正在使用 Airflow Taskflow API 2.0。我遇到了结合使用 TaskGroup 和 BranchPythonOperator 的问题。
下面是我的代码:

import airflow
from airflow.models import DAG
from airflow.decorators import task, dag
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator
from airflow.operators.python import task, get_current_context
from random import randint
from airflow.utils.task_group import TaskGroup


default_args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
}

@task
def dummy_task():
return {}


@task
def task_b():
return {}

@task
def task_c():
return {}

def final_step():
return {}

def get_tasks(**kwargs):
task = 'task_a'

return task


with DAG(dag_id='branch_dag',
default_args=default_args,
schedule_interval=None) as dag:

with TaskGroup('task_a') as task_a:
obj = dummy_task()

tasks = BranchPythonOperator(
task_id='check_api',
python_callable=get_tasks,
provide_context=True
)

final_step = PythonOperator(
task_id='final_step',
python_callable=final_step,
trigger_rule='one_success'
)

b = task_b()
c = task_c()

tasks >> task_a >> final_step
tasks >> b >> final_step
tasks >> c >> final_step
当我触发此 DAG 时,我在 check_api 任务中收到以下错误:
Airflow .异常.TaskNotFound:找不到任务task_a
是否可以将 TaskGroup 与 BranchPythonOperator 结合使用并使其正常工作?
谢谢,

最佳答案

BranchPythonOperator预计返回task_ids您需要更改 get_tasks功能:

def get_tasks(**kwargs):
task = 'task_a.dummy_task'
return task
enter image description here

关于Airflow - 在同一个 DAG 中使用 TaskGroup 和 PythonBranchOperator,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67720424/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com