gpt4 book ai didi

python - Airflow、XCom 和多个 task_id

转载 作者:太空宇宙 更新时间:2023-11-03 21:25:08 25 4
gpt4 key购买 nike

指定多个任务时,task_ids 如何工作?

在这个特定的代码示例中,我希望从元组 (5555,22222) 中的两个任务中检索 load_cycle_id_2,但结果却是 (None, 22222)。

这是为什么呢?

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

args = {
'owner': 'airflow',
'start_date': datetime.now(),
'provide_context': True
}

demo_dag = DAG(dag_id='first', start_date=datetime.now(), schedule_interval='@once',default_args=args)

def push_load_id(**kwargs):
kwargs['ti'].xcom_push(key='load_cycle_id_2',value=22222)
kwargs['ti'].xcom_push(key='load_cycle_id_3',value=44444)

def another_push_load_id(**kwargs):
kwargs['ti'].xcom_push(key='load_cycle_id_2',value=5555)
kwargs['ti'].xcom_push(key='anotherload_cycle_id_3',value=6666)

def pull_load_id(**kwargs):
ti = kwargs['ti'].xcom_pull(key='load_cycle_id_2', task_ids=['another_push_load_id','push_load_id'])
print(ti)

push_operator = PythonOperator(task_id='push_load_id', python_callable=push_load_id, dag=demo_dag)
pull_operator = PythonOperator(task_id='pull_load_id', python_callable=pull_load_id, dag=demo_dag)

push_operator >> pull_operator

最佳答案

您的 dags 仅运行 push_load_idpull_load_id 函数。您没有创建使用 another_push_load_id 函数的运算符。

代码的结尾应如下所示:

push_operator = PythonOperator(task_id='push_load_id', python_callable=push_load_id, dag=demo_dag)
another_push_operator = PythonOperator(task_id='push_load_id', python_callable= another_push_load_id, dag=demo_dag)
pull_operator = PythonOperator(task_id='pull_load_id', python_callable=pull_load_id, dag=demo_dag)

push_operator >> another_push_operator >> pull_operator

关于python - Airflow、XCom 和多个 task_id,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53896957/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com