gpt4 book ai didi

python - 传递字符串列表作为 Airflow 中依赖任务的参数

转载 作者:太空宇宙 更新时间:2023-11-03 15:50:36 25 4
gpt4 key购买 nike

我正在尝试通过 XCom 将字符串列表从一个任务传递到另一个任务但我似乎无法将推送的列表解释回列表。

例如,当我在 ShortCircuitOperator 中运行的某个函数 blah 中执行此操作时:

paths = ['gs://{}/{}'.format(bucket, obj) for obj in my_list]
kwargs['ti'].xcom_push(key='return_value', value=full_paths)

然后我想使用这样的列表作为运算符的参数。例如,

run_task_after_blah = AfterBlahOperator(
task_id='run-task-after-blah',
...,
input_paths="{{ ti.xcom_pull(task_ids='find-paths') }}",
...,
)

我希望 input_paths 等于 paths 但它不是,因为渲染首先发生然后赋值,并且模板渲染在某种程度上转换了 xcom_pull 返回一个 stringified 列表(然后我的 AfterBlahOperator 插入将其分配为 JSON 中元素的值。

我尝试将 paths 连接成一个由一些分隔符分隔的字符串,并将其推送到 XCom,然后在从 XCom 中拉出时将其拆分回来,但是当 XCom 首先呈现时,我得到,要么当在模板内调用 split 函数时 stringified 列表或 paths 的原始连接字符串,如果 split函数应用于参数(如 "{{ ti.xcom_pull(task_ids='find-paths') }}".split(';')

XCom 似乎非常适用于将单个值作为任务参数或多个值(当可以进一步处理提取的值时),但不适用于将 multiple_values 转换为“一个”作为任务参数。

有没有一种方法可以做到这一点而不必编写一个额外的函数来精确返回这样的字符串列表?或者也许我过度滥用 XCom,但 Airflow 中有许多运算符将元素列表作为参数(例如,通常是多个文件的完整路径,这些文件是先前任务的结果,因此事先不知道)。

最佳答案

Jinja 呈现字符串,因此如果您通过模板获取 XCom,它始终是一个字符串。相反,您将需要获取您有权访问 TaskInstance 对象的 XCom。像这样:

class AfterBlahOperator(BaseOperator):

def __init__(self, ..., input_task_id, *args, **kwargs):
...
self.input_task_id = input_task_id
super(AfterBlahOperator, self).__init__(*args, **kwargs)

def execute(self, context):
input_paths = context['ti'].xcom_pull(task_ids=self.input_task_id)
for path in input_paths:
...

这类似于您在 PythonOperator 中获取它的方式,XCom docs举个例子。

请注意,您仍然可以支持单独的 input_paths 参数,当它可以硬编码到 DAG 中时,您只需要额外检查一下以查看从哪个参数读取值。

关于python - 传递字符串列表作为 Airflow 中依赖任务的参数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47052582/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com