gpt4 book ai didi

Airflow - 如何将一个运算符(operator)的输出数据作为输入传递给另一个任务

转载 作者:行者123 更新时间:2023-12-04 10:49:57 25 4
gpt4 key购买 nike

我有一个 http 端点列表,每个端点都独立执行一项任务。我们正在尝试编写一个应用程序,该应用程序将通过按特定顺序调用这些端点来进行编排。在这个解决方案中,我们还必须处理一个 http 端点的输出并为下一个 http 端点生成输入。此外,可以根据触发器同时调用相同的工作流。

到目前为止我所做的,
1. 定义了一个派生自 HttpOperator 的新运算符,并引入了将 http 端点的输出写入文件的功能。
2. 编写了一个python 运算符,它可以根据必要的逻辑传输输出。

由于我可以执行同一工作流的多个实例,因此我无法对输出文件名进行硬编码。有没有办法让我写的 http 操作符写入一些唯一的文件名,并且相同的文件名应该可用于下一个任务,以便它可以读取和处理输出。

最佳答案

Airflow 确实有一个用于运营商交叉通信的功能,称为 XCom

XCom 可以“推送”(发送)或“拉取”(接收)。当一个任务推送一个 XCom 时,它使它普遍可用于其他任务。任务可以通过调用 xcom_push() 方法随时推送 XCom。

任务调用 xcom_pull() 来检索 XCom,可以选择根据键、源 task_id 和源 dag_id 等条件应用过滤器。

推到 XCOM 使用

ti.xcom_push(key=<variable name>, value=<variable value>)

要拉 XCOM 对象使用
myxcom_val = ti.xcom_pull(key=<variable name>, task_ids='<task to pull from>')

使用 bash operator ,您只需设置 xcom_push = True并且 stdout 中的最后一行设置为 xcom 对象。

您可以查看 xcom 对象,当您的任务正在运行时,只需从 Airflow UI 中打开 tast 执行并单击 xcom 选项卡。

关于Airflow - 如何将一个运算符(operator)的输出数据作为输入传递给另一个任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59535809/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com