Trying the Taskflow API and trying to print values returned via XCOM's, however, unable to do so. Could anyone let me know where am I going wrong?
然而,尝试Taskflow API并尝试打印通过XCOM返回的值,却无法做到这一点。有人能告诉我我哪里错了吗?
from datetime import datetime, timedelta
from airflow.decorators import dag, task
default_args = {
'owner': 'ashish',
'retries': '2',
'retry_delay': timedelta(minutes=2)
}
@dag(dag_id='DAG_with_TaskFlow_API',
default_args=default_args,
description='Example for Task Flow API',
start_date=datetime(2023, 8, 30, 2),
schedule_interval='@daily')
def hello():
# Setting the value normally
@task()
def get_name():
return 'Ashish'
# Passing multiple values
@task()
def get_fullname(multiple_outputs=True):
return {'firstname': 'Ashish', 'lastname': 'Inamdar'}
@task()
def greet(name):
print(f"Hello {name}")
@task()
def greet_multiple(firstname, lastname):
print(f"Hi {firstname} {lastname}")
name = get_name()
greet(name=name)
fullname_dict = get_fullname()
greet_multiple(fullname_dict['firstname'], fullname_dict['lastname'])
taskflow_api_dag = hello()
I have read and tried the same approach shown in an another answer by Josh Fell cited on the link issue with passing return value from a task as an argument to another task
我已经阅读并尝试了Josh Fell在链接问题上引用的另一个答案中显示的相同方法,即将一个任务的返回值作为参数传递给另一个任务
Below are the airflow logs for the task:
以下是该任务的气流日志:
3a4feea079c
*** Found local files:
*** * /opt/airflow/logs/dag_id=DAG_with_TaskFlow_API/run_id=manual__2023-09-10T05:23:33.215377+00:00/task_id=greet_multiple/attempt=3.log
[2023-09-10, 05:27:46 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: DAG_with_TaskFlow_API.greet_multiple manual__2023-09-10T05:23:33.215377+00:00 [queued]>
[2023-09-10, 05:27:46 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: DAG_with_TaskFlow_API.greet_multiple manual__2023-09-10T05:23:33.215377+00:00 [queued]>
[2023-09-10, 05:27:46 UTC] {taskinstance.py:1361} INFO - Starting attempt 3 of 3
[2023-09-10, 05:27:46 UTC] {taskinstance.py:1382} INFO - Executing <Task(_PythonDecoratedOperator): greet_multiple> on 2023-09-10 05:23:33.215377+00:00
[2023-09-10, 05:27:46 UTC] {standard_task_runner.py:57} INFO - Started process 16082 to run task
[2023-09-10, 05:27:46 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'DAG_with_TaskFlow_API', 'greet_multiple', 'manual__2023-09-10T05:23:33.215377+00:00', '--job-id', '162', '--raw', '--subdir', 'DAGS_FOLDER/task_with_taskflow_api.py', '--cfg-path', '/tmp/tmptyfhicx8']
[2023-09-10, 05:27:46 UTC] {standard_task_runner.py:85} INFO - Job 162: Subtask greet_multiple
[2023-09-10, 05:27:46 UTC] {task_command.py:415} INFO - Running <TaskInstance: DAG_with_TaskFlow_API.greet_multiple manual__2023-09-10T05:23:33.215377+00:00 [running]> on host f3a4feea079c
[2023-09-10, 05:27:46 UTC] {abstractoperator.py:696} ERROR - Exception rendering Jinja template for task 'greet_multiple', field 'op_args'. Template: (XComArg(<Task(_PythonDecoratedOperator): get_fullname>, 'firstname'), XComArg(<Task(_PythonDecoratedOperator): get_fullname>, 'lastname'))
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/abstractoperator.py", line 688, in _do_render_template_fields
rendered_content = self.render_template(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/template/templater.py", line 162, in render_template
return tuple(self.render_template(element, context, jinja_env, oids) for element in value)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/template/templater.py", line 162, in <genexpr>
return tuple(self.render_template(element, context, jinja_env, oids) for element in value)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/template/templater.py", line 158, in render_template
return value.resolve(context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 77, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom_arg.py", line 431, in resolve
raise XComNotFound(ti.dag_id, task_id, self.key)
airflow.exceptions.XComNotFound: XComArg result from get_fullname at DAG_with_TaskFlow_API with key="firstname" is not found!
[2023-09-10, 05:27:46 UTC] {taskinstance.py:1943} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1646, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2291, in render_templates
original_task.render_template_fields(context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1244, in render_template_fields
self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 77, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/abstractoperator.py", line 688, in _do_render_template_fields
rendered_content = self.render_template(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/template/templater.py", line 162, in render_template
return tuple(self.render_template(element, context, jinja_env, oids) for element in value)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/template/templater.py", line 162, in <genexpr>
return tuple(self.render_template(element, context, jinja_env, oids) for element in value)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/template/templater.py", line 158, in render_template
return value.resolve(context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 77, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom_arg.py", line 431, in resolve
raise XComNotFound(ti.dag_id, task_id, self.key)
airflow.exceptions.XComNotFound: XComArg result from get_fullname at DAG_with_TaskFlow_API with key="firstname" is not found!
[2023-09-10, 05:27:46 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=DAG_with_TaskFlow_API, task_id=greet_multiple, execution_date=20230910T052333, start_date=20230910T052746, end_date=20230910T052746
[2023-09-10, 05:27:46 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 162 for task greet_multiple (XComArg result from get_fullname at DAG_with_TaskFlow_API with key="firstname" is not found!; 16082)
[2023-09-10, 05:27:46 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2023-09-10, 05:27:46 UTC] {taskinstance.py:2784} INFO - 0 downstream tasks scheduled from follow-on schedule check
更多回答
[Update] I even tried replacing greet_multiple(fullname_dict['firstname'], fullname_dict['lastname'])
with greet_multiple(firstname=fullname_dict['firstname'], lastname=fullname_dict['lastname'])
, still no luck.
[更新]我甚至尝试用greet_multiple(firstname=fullname_dict[‘firstname’],LASTNAME=FULLNAME_DICT[‘LASTNAME’])替换GREET_MULTIPLE(FULLNAME_DICT[‘FIRSTNAME’],FULLNAME_DICT[‘LASTNAME’]),仍然没有成功。
multiple_outputs
is a parameter on the underlying operator and not an input to the decorated function; think of it like adding retries
, queue
, pool
, and other "general" BaseOperator-related parameters. There are a few examples of explicitly passing a multiple_outputs
value in the TaskFlow API tutorial too.
MULTIPLE_OUTPUTS是底层操作符的参数,而不是修饰函数的输入;可以将其视为添加重试、队列、池和其他与BaseOperator相关的“通用”参数。在TaskFlow API教程中也有一些显式传递MULTIPLE_OUTPUTS值的示例。
So, moving multiple_outputs=True
to the @task
decorator itself will solve the XCom retrieval.
因此,将MULTIPLE_OUTPUTS=True移动到@TASK修饰符本身将解决XCOM检索问题。
# Passing multiple values
@task(multiple_outputs=True)
def get_fullname():
return {'firstname': 'Ashish', 'lastname': 'Inamdar'}
Airflow will also infer a multiple_outputs
value if the return annotation of the decorated function is a dictionary.
如果修饰函数的返回注释是一个字典,Airflow也会推断出一个multiple_outputs值。
Meaning this would also work:
这意味着它也可以工作:
@task
def get_fullname() -> dict[str, str]:
return {'firstname': 'Ashish', 'lastname': 'Inamdar'}
更多回答
Thank you @Josh, this really helped. Thank you for the knowledge links too :)
谢谢你@Josh,这真的很有帮助。也感谢您提供的知识链接:)
我是一名优秀的程序员,十分优秀!