gpt4 book ai didi

Taskflow API not updating XCOMS(Taskflow API未更新XCOMS)

转载 作者:bug小助手 更新时间:2023-10-25 15:01:48 29 4
gpt4 key购买 nike



Trying the Taskflow API and trying to print values returned via XCOM's, however, unable to do so. Could anyone let me know where am I going wrong?

然而,尝试Taskflow API并尝试打印通过XCOM返回的值,却无法做到这一点。有人能告诉我我哪里错了吗?


from datetime import datetime, timedelta

from airflow.decorators import dag, task

default_args = {
'owner': 'ashish',
'retries': '2',
'retry_delay': timedelta(minutes=2)
}


@dag(dag_id='DAG_with_TaskFlow_API',
default_args=default_args,
description='Example for Task Flow API',
start_date=datetime(2023, 8, 30, 2),
schedule_interval='@daily')
def hello():

# Setting the value normally
@task()
def get_name():
return 'Ashish'

# Passing multiple values
@task()
def get_fullname(multiple_outputs=True):
return {'firstname': 'Ashish', 'lastname': 'Inamdar'}

@task()
def greet(name):
print(f"Hello {name}")

@task()
def greet_multiple(firstname, lastname):
print(f"Hi {firstname} {lastname}")

name = get_name()
greet(name=name)

fullname_dict = get_fullname()
greet_multiple(fullname_dict['firstname'], fullname_dict['lastname'])


taskflow_api_dag = hello()

I have read and tried the same approach shown in an another answer by Josh Fell cited on the link issue with passing return value from a task as an argument to another task

我已经阅读并尝试了Josh Fell在链接问题上引用的另一个答案中显示的相同方法,即将一个任务的返回值作为参数传递给另一个任务


Below are the airflow logs for the task:

以下是该任务的气流日志:


3a4feea079c
*** Found local files:
*** * /opt/airflow/logs/dag_id=DAG_with_TaskFlow_API/run_id=manual__2023-09-10T05:23:33.215377+00:00/task_id=greet_multiple/attempt=3.log
[2023-09-10, 05:27:46 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: DAG_with_TaskFlow_API.greet_multiple manual__2023-09-10T05:23:33.215377+00:00 [queued]>
[2023-09-10, 05:27:46 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: DAG_with_TaskFlow_API.greet_multiple manual__2023-09-10T05:23:33.215377+00:00 [queued]>
[2023-09-10, 05:27:46 UTC] {taskinstance.py:1361} INFO - Starting attempt 3 of 3
[2023-09-10, 05:27:46 UTC] {taskinstance.py:1382} INFO - Executing <Task(_PythonDecoratedOperator): greet_multiple> on 2023-09-10 05:23:33.215377+00:00
[2023-09-10, 05:27:46 UTC] {standard_task_runner.py:57} INFO - Started process 16082 to run task
[2023-09-10, 05:27:46 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'DAG_with_TaskFlow_API', 'greet_multiple', 'manual__2023-09-10T05:23:33.215377+00:00', '--job-id', '162', '--raw', '--subdir', 'DAGS_FOLDER/task_with_taskflow_api.py', '--cfg-path', '/tmp/tmptyfhicx8']
[2023-09-10, 05:27:46 UTC] {standard_task_runner.py:85} INFO - Job 162: Subtask greet_multiple
[2023-09-10, 05:27:46 UTC] {task_command.py:415} INFO - Running <TaskInstance: DAG_with_TaskFlow_API.greet_multiple manual__2023-09-10T05:23:33.215377+00:00 [running]> on host f3a4feea079c
[2023-09-10, 05:27:46 UTC] {abstractoperator.py:696} ERROR - Exception rendering Jinja template for task 'greet_multiple', field 'op_args'. Template: (XComArg(<Task(_PythonDecoratedOperator): get_fullname>, 'firstname'), XComArg(<Task(_PythonDecoratedOperator): get_fullname>, 'lastname'))
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/abstractoperator.py", line 688, in _do_render_template_fields
rendered_content = self.render_template(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/template/templater.py", line 162, in render_template
return tuple(self.render_template(element, context, jinja_env, oids) for element in value)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/template/templater.py", line 162, in <genexpr>
return tuple(self.render_template(element, context, jinja_env, oids) for element in value)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/template/templater.py", line 158, in render_template
return value.resolve(context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 77, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom_arg.py", line 431, in resolve
raise XComNotFound(ti.dag_id, task_id, self.key)
airflow.exceptions.XComNotFound: XComArg result from get_fullname at DAG_with_TaskFlow_API with key="firstname" is not found!
[2023-09-10, 05:27:46 UTC] {taskinstance.py:1943} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1518, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode, session=session)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1646, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2291, in render_templates
original_task.render_template_fields(context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1244, in render_template_fields
self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 77, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/abstractoperator.py", line 688, in _do_render_template_fields
rendered_content = self.render_template(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/template/templater.py", line 162, in render_template
return tuple(self.render_template(element, context, jinja_env, oids) for element in value)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/template/templater.py", line 162, in <genexpr>
return tuple(self.render_template(element, context, jinja_env, oids) for element in value)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/template/templater.py", line 158, in render_template
return value.resolve(context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 77, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom_arg.py", line 431, in resolve
raise XComNotFound(ti.dag_id, task_id, self.key)
airflow.exceptions.XComNotFound: XComArg result from get_fullname at DAG_with_TaskFlow_API with key="firstname" is not found!
[2023-09-10, 05:27:46 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=DAG_with_TaskFlow_API, task_id=greet_multiple, execution_date=20230910T052333, start_date=20230910T052746, end_date=20230910T052746
[2023-09-10, 05:27:46 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 162 for task greet_multiple (XComArg result from get_fullname at DAG_with_TaskFlow_API with key="firstname" is not found!; 16082)
[2023-09-10, 05:27:46 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2023-09-10, 05:27:46 UTC] {taskinstance.py:2784} INFO - 0 downstream tasks scheduled from follow-on schedule check

更多回答

[Update] I even tried replacing greet_multiple(fullname_dict['firstname'], fullname_dict['lastname']) with greet_multiple(firstname=fullname_dict['firstname'], lastname=fullname_dict['lastname']), still no luck.

[更新]我甚至尝试用greet_multiple(firstname=fullname_dict[‘firstname’],LASTNAME=FULLNAME_DICT[‘LASTNAME’])替换GREET_MULTIPLE(FULLNAME_DICT[‘FIRSTNAME’],FULLNAME_DICT[‘LASTNAME’]),仍然没有成功。

优秀答案推荐

multiple_outputs is a parameter on the underlying operator and not an input to the decorated function; think of it like adding retries, queue, pool, and other "general" BaseOperator-related parameters. There are a few examples of explicitly passing a multiple_outputs value in the TaskFlow API tutorial too.

MULTIPLE_OUTPUTS是底层操作符的参数,而不是修饰函数的输入;可以将其视为添加重试、队列、池和其他与BaseOperator相关的“通用”参数。在TaskFlow API教程中也有一些显式传递MULTIPLE_OUTPUTS值的示例。


So, moving multiple_outputs=True to the @task decorator itself will solve the XCom retrieval.

因此,将MULTIPLE_OUTPUTS=True移动到@TASK修饰符本身将解决XCOM检索问题。


# Passing multiple values
@task(multiple_outputs=True)
def get_fullname():
return {'firstname': 'Ashish', 'lastname': 'Inamdar'}

Airflow will also infer a multiple_outputs value if the return annotation of the decorated function is a dictionary.

如果修饰函数的返回注释是一个字典,Airflow也会推断出一个multiple_outputs值。


Meaning this would also work:

这意味着它也可以工作:


@task
def get_fullname() -> dict[str, str]:
return {'firstname': 'Ashish', 'lastname': 'Inamdar'}

更多回答

Thank you @Josh, this really helped. Thank you for the knowledge links too :)

谢谢你@Josh,这真的很有帮助。也感谢您提供的知识链接:)

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com