I am trying to access XCOM value while learning Airflow, but every time, I get None returned.
我试图在学习气流的同时访问XCOM值,但每次都得不到任何返回。
Below is the DAG code. I have also set the dependency, still no luck.
以下是DAG代码。我也设定了依赖,还是没有运气。
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'ashish',
'retries': '2',
'retry_delay': timedelta(minutes=2)
}
# Basic Python Function
"""
def greet():
print("This is an example of the Python Operator DAG.")
# Python Function Accepting Parameters
def parameter_greet(name):
print(f"Hi {name}")
"""
# Python Function returning value
def get_name():
return 'Ashish'
def get_name_xcom(ti):
name_xcom = ti.xcom_pull(task_ids='get_name')
print(f"Hi {name_xcom}. This was pulled from XCOM.")
with DAG(
dag_id='Python_Operator',
default_args=default_args,
description='Example for Python Operator',
start_date=datetime(2023, 8, 30, 2),
schedule_interval='@daily'
) as dag:
"""
task1 = PythonOperator(
task_id='Greet',
python_callable=greet
)
task2 = PythonOperator(
task_id='Greet_Parameter',
python_callable=parameter_greet,
op_kwargs={'name': 'Ashish'}
)
"""
task3 = PythonOperator(
task_id='XCOM_Example',
python_callable=get_name
)
task4 = PythonOperator(
task_id='Greet_with_XCOM',
python_callable=get_name_xcom
)
# task1
# task2
task3 >> task4
Tried setting the value using ti.xcom.push and then retrieving it using ti.xcom.pull but that is not working either.
尝试使用ti.xcom.ush设置该值,然后使用ti.xcom.ull检索该值,但也不起作用。
更多回答
优秀答案推荐
The task_ids
value in xcom_pull()
should be the actual task_id
of the task which pushed the XCom rather than the name of the callable.
XCOM_Pull()中的TASK_IDS值应该是推送XCOM的任务的实际TASK_ID,而不是可调用对象的名称。
You'll need to update your get_name_xcom
your function to this:
您需要将GET_NAME_XCOM函数更新为:
def get_name_xcom(ti):
name_xcom = ti.xcom_pull(task_ids='XCOM_Example')
print(f"Hi {name_xcom}. This was pulled from XCOM.")
更多回答
Thank you @Josh Fell, this worked like a charm.
“谢谢你,”乔什·费尔说,“这就像一个护身符。”
我是一名优秀的程序员,十分优秀!