gpt4 book ai didi

airflow - Apache Airflow - 如何在使用 TriggerDagRunOperator 触发的流中检索运算符外部的 dag_run 数据

转载 作者:行者123 更新时间:2023-12-01 15:02:06 25 4
gpt4 key购买 nike

我设置了两个 DAG,我们称第一个为 orchestrator,第二个为 worker。 Orchestrator 的工作是从 API 中检索一个列表,并针对该列表中的每个元素,使用一些参数触发工作 DAG。

我将这两个工作流程分开的原因是我希望能够仅重播失败的“工作”工作流程(如果一个失败,我不想重播所有工作实例)。

我能够让事情正常进行,但现在我看到了监控是多么困难,因为我的 task_id 对所有人都是一样的,所以我决定根据“orchestrator”工作流从 API 检索到的值来使用动态 task_id。

但是,我无法从运算符外部的 dag_run 对象中检索值。基本上,我希望这个工作:

with models.DAG('specific_workflow', schedule_interval=None, default_args=default_dag_args) as dag:
name = context['dag_run'].name
hello_world = BashOperator(task_id='hello_{}'.format(name), bash_command="echo Hello {{ dag_run.conf.name }}", dag=dag)
bye = BashOperator(task_id='bye_{}'.format(name), bash_command="echo Goodbye {{ dag_run.conf.name }}", dag=dag)

hello_world >> bye

但我无法定义这个“上下文”对象。但是,我可以从运算符(例如 PythonOperator 和 BashOperator)访问它。

是否可以在运算符(operator)之外检索 dag_run 对象?

最佳答案

我认为目前这不容易。例如,作为工作程序运行过程的一部分,除了在哪里可以找到 DAG 之外,在没有提供任何 TaskInstance 上下文的情况下检索 DAG:https://github.com/apache/incubator-airflow/blob/f18e2550543e455c9701af0995bc393ee6a97b47/airflow/bin/cli.py#L353

稍后注入(inject)上下文:https://github.com/apache/incubator-airflow/blob/c5f1c6a31b20bbb80b4020b753e88cc283aaf197/airflow/models.py#L1479
run_id DAG 将是存储此信息的好地方。

关于airflow - Apache Airflow - 如何在使用 TriggerDagRunOperator 触发的流中检索运算符外部的 dag_run 数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51731791/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com