gpt4 book ai didi

python - Airflow Branch Operator 和任务组无效的任务 ID

转载 作者:行者123 更新时间:2023-12-02 18:11:20 25 4
gpt4 key购买 nike

我有一个简单的 dag,它使用分支运算符来检查 y 是否为 False。如果是,则 dag 应该转到 say_goodbye 任务组。如果为真,它会跳过并转到 finish_dag_step。这是狗:

def which_step() -> str:
y = False
if not y:
return 'say_goodbye'
else:
return 'finish_dag_step'

with DAG(
'my_test_dag',
start_date = datetime(2022, 5, 14),
schedule_interval = '0 0 * * *',
catchup = True) as dag:

say_hello = BashOperator(
task_id = 'say_hello',
retries = 3,
bash_command = 'echo "hello world"'
)

run_which_step = BranchPythonOperator(
task_id = 'run_which_step',
python_callable = which_step,
retries = 3,
retry_exponential_backoff = True,
retry_delay = timedelta(seconds = 5)
)

with TaskGroup('say_goodbye') as say_goodbye:
for i in range(0,2):
step = BashOperator(
task_id = 'step_' + str(i),
retries = 3,
bash_command = 'echo "goodbye world"'
)

step

finish_dag_step = BashOperator(
task_id = 'finish_dag_step',
retries = 3,
bash_command = 'echo "dag is finished"'
)
say_hello >> run_which_step
run_which_step >> say_goodbye >> finish_dag_step
run_which_step >> finish_dag_step
finish_dag_step

当 dag 命中 run_which_step 时,我收到以下错误:

enter image description here enter image description here

我不明白是什么原因造成的。这是怎么回事?

最佳答案

您不能为任务组创建任务依赖项。因此,您必须通过 task_id 来引用任务,这是 TaskGroup 的名称和任务的 id 由一个点连接 (task_group.task_id)。

你的分支函数应该返回类似的东西

def branch():
if condition:
return [f'task_group.task_{i}' for i in range(0,2)]
return 'default'

但不是以这种方式返回任务 ID 列表,可能最简单的方法是将 DummyOperator 作为您进入 TaskGroup 的入口点,并将这些任务置于 DummyOperator 的下游。

关于python - Airflow Branch Operator 和任务组无效的任务 ID,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72306836/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com