gpt4 book ai didi

python - 任务组中的 Airflow 2 Xcom

转载 作者:行者123 更新时间:2023-12-02 18:12:15 25 4
gpt4 key购买 nike

我在 TaskGroup 中有两个任务需要提取 xcom 值以提供 job_flow_id 和 step_id。这是代码:

  with TaskGroup('execute_my_steps') as execute_my_steps: 

config = {some dictionary}
dependencies = {another dictionary}

task_id = 'execute_spark_job_step'
task_name = 'spark_job'

add_step = EmrAddStepsOperator(
task_id=task_id,
job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}",
steps=create_emr_step(args=config, d=dependencies),
aws_conn_id='aws_default',
retries=3,
dag=dag
)

wait_for_step = EmrStepSensor(
task_id='wait_for_' + task_name + '_step',
job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='" + task_id + "', key='return_value') }}",
retries=3,
dag=dag,
mode='reschedule'
)

add_step >> wait_for_step

问题是 step_id 没有正确呈现。 UI 渲染模板中的 wait_for_step 值显示为 'None',但是,execute_spark_job_step 的 xcom return_value 在那里(这是 emr step_id ).

wait_for_step 渲染模板: wait_for_step rendered template

execute_spark_job_step xcom: execute_spark_job_step xcom

当我删除 TaskGroup 时,它呈现正常并且该步骤会等待直到作业进入完成状态。

我需要将其加入任务组,因为我将遍历更大的配置文件并创建多个步骤。

为什么这行不通?我需要一个嵌套的任务组吗?我尝试在没有上下文管理器的情况下使用 TaskGroup,但仍然没有成功。

最佳答案

长话短说:

您的问题发生是因为 id 不是 task_id 它是 group_id.task_id所以你的代码应该是:

task_ids=f"execute_my_steps.{ task_id }"

=>

    step_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids=f"execute_my_steps.{ task_id }", key='return_value') }}",

发生原因的解释:

当任务分配给 TaskGroup 时,任务的 id 不再是 task_id,而是变成 group_id.task_id 以反射(reflect)这种关系。在 Airflow 中,task_id 是唯一的,但是当您使用 TaskGroup 时,您可以在不同的 TaskGroup 中设置相同的 task_id。如果此行为不是您想要的,您可以通过在任务组中设置 prefix_group_id=False 来禁用它:

with TaskGroup(
group_id='execute_my_steps',
prefix_group_id=False
) as execute_my_steps:

这样做,您的代码将无需更改即可工作。 task_id 只是 task_id 没有 group_id 前缀。请注意,这也意味着您有责任确保您的 DAG 中没有重复的 task_id。

关于python - 任务组中的 Airflow 2 Xcom,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72134364/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com