gpt4 book ai didi

python - 从一个 Airflow DAG 返回值到另一个

转载 作者:行者123 更新时间:2023-12-04 11:29:56 36 4
gpt4 key购买 nike

我的 DAG(我们称之为 DAG_A)使用 trigger_dagrun 启动另一个 DAG(DAG_B)运算符(operator)。 DAG_B 的任务使用 XCOM,我想在完成后从 DAG_B 运行的任务之一(正是我开始的任务)中获取 XCOM 值。
使用 XCOM 不是硬性要求 - 基本上 Airflow 本身提供的任何(合理的)机制都可以工作。如果需要,我可以更改 DAG_B。
找不到此类案例的任何示例,因此感谢您的帮助。
计划 B 是让 DAG_B 将 XCOM 值与一些运行 ID 一起保存到一些持久性存储中,例如 DB 或文件,DAG_A 将从那里获取它。但如果有一些内置机制可用,我想避免这种复杂化。

最佳答案

您可以通过传入 dag_id 从另一个 dag 中提取 XCOM 值。至 xcom_pull() (请参阅 task_instance.xcom_pull() function documentation )。只要您使用与当前 DAG 相同的执行日期触发 subdag,这就会起作用。这可以通过模板化 execution_date 轻松实现。值(value):

trigger = TriggerDagRunOperator(
task_id="trigger_dag_b",
trigger_dag_id="DAG_B",
execution_date="{{ execution_date }}",
...
)
然后,假设您使用了 ExternalTaskSensor传感器等待特定任务完成或使用 wait_for_completion=True在您的 TriggerDagRunOperator()任务,稍后您可以使用 task_instance.xcom_pull(dag_id="DAG_B", ...) 拉取 XCOM (添加任务 id 和/或您要提取的 XCOM key )。
如果您不反对编写 Python 运算符,您还可以导入 XCom模型,只需使用它的 XCom.get_one() method直接地:
value = XCom.get_one(
execution_date=ti.execution_date,
key="target key",
task_id="some.task.id",
dag_id="DAG_B",
)
我使用了类似的技术,使用多 dagrun 触发器(处理可变数量的资源);这比较棘手,因为在这种情况下您不能重复使用执行日期(每个 dagrun 必须有一个唯一的 (dag_id, execution_date) 元组)。
在这些情况下,我要么使用直接查询(使用触发器存储在 XCom 中的 dagrun id 将 SQLAlchemy XCom 模型与 DagRun 模型结合起来,而不是依赖于执行日期匹配),或者通过配置来避免整个问题前面的 subdags。后者是通过设置带有配置的子 dag 来实现的,该配置告诉它在哪里输出父 dag 然后选择的结果。文档似乎没有正确提及这一点,但 conf论据 TriggerDagRun()也支持模板化,因此您可以在其中生成字典作为子 dag 的输入,子 dag 中的任务然后通过 params 引用配置.

关于python - 从一个 Airflow DAG 返回值到另一个,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67299728/

36 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com