gpt4 book ai didi

python - Airflow :如何将 xcom 从父 dag 传递到 subdag

转载 作者:行者123 更新时间:2023-12-04 03:11:22 25 4
gpt4 key购买 nike

考虑到将值推送到 xcom 的父 dag,如何从 subdag 中检索 dag?

我试过的:

#parent_dag.py

PARENT_DAG_NAME = "MyParentDag"
CHILD_DAG_NAME = "MyChildDag"

main_dag = DAG(
dag_id=PARENT_DAG_NAME,
schedule_interval="@hourly",
start_date=DAG_START_DATE
)


def push_value(**kwargs):
''' push into Xcom '''
return [1, 2]

t1 = PythonOperator(task_id='push_value',
python_callable=push_value,
retries=3,
dag=main_dag)

subdag_1 = SubDagOperator(
subdag=Sub_Dag1(
PARENT_DAG_NAME,
CHILD_DAG_NAME,
main_dag.start_date,
main_dag.schedule_interval,
"'{{ ti.xcom_pull(task_ids='push_value', dag_id='" + PARENT_DAG_NAME + "' }}'"
),
task_id=CHILD_DAG_NAME,
dag=main_dag,
)
t1 >> subdag_1

和 child subdag:
#subdag1.py


def use_pushed_val(pushed_val, ds, **kwargs):
log.info(pushed_val)
return pushed_val

def Sub_Dag1(parent_dag_name, child_dag_name, start_date, schedule_interval, pushed_val):
dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date,
)

childTask = PythonOperator(
task_id='child_task',
python_callable=use_pushed_val,
op_kwargs = {'pushed_val' : pushed_val},
provide_context=True,
dag=dag
)

return dag

而不是 child subdag 来记录和返回 [1,2] ,它返回字符串 '{{ ti.xcom_pull(task_ids='push_value', dag_id='MyParentDag' }}'

最佳答案

我看到您已经设置了 provide_context=True,所以很好。这就是我使用 **context 参数在父/子 dag 之间传递变量的方式。

def push_value(**context):
context['ti'].xcom_push(key='my_key', value='my_value')

def use_pushed_val(**context):
value_from_parent = context['ti'].xcom_pull(task_ids=t1.task_id, key='my_key')

关于python - Airflow :如何将 xcom 从父 dag 传递到 subdag,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54777603/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com