gpt4 book ai didi

directed-acyclic-graphs - 我们如何使用 TriggerDagRunOperator 触发多个 Airflow dags?

转载 作者:行者123 更新时间:2023-12-04 00:18:16 33 4
gpt4 key购买 nike

我有一个场景,其中一个特定的 dag 在完成时需要触发多个 dag,已经使用 TriggerDagRunOperator 触发单个 dag,是否可以将多个 dag 传递给 TriggerDagRunOperator 以触发多个 dag?

并且是否可以仅在当前 dag 成功完成时触发。

最佳答案

我遇到了同样的问题。没有开箱即用的解决方案,但我们可以为它编写自定义运算符。

所以这里是自定义操作符的代码,得到 python_callabletrigger_dag_id作为参数:

class TriggerMultiDagRunOperator(TriggerDagRunOperator):

@apply_defaults
def __init__(self, op_args=None, op_kwargs=None, *args, **kwargs):
super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}

def execute(self, context):
session = settings.Session()
created = False
for dro in self.python_callable(context, *self.op_args, **self.op_kwargs):
if not dro or not isinstance(dro, DagRunOrder):
break

if dro.run_id is None:
dro.run_id = 'trig__' + datetime.utcnow().isoformat()

dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
conf=dro.payload,
external_trigger=True
)
created = True
self.log.info("Creating DagRun %s", dr)

if created is True:
session.commit()
else:
self.log.info("No DagRun created")
session.close()
trigger_dag_id是我们想要多次运行的 dag id。
python_callable是一个函数,它应该返回一个列表 DagRunOrder对象,一个对象用于调度一个具有 dag_id 的 DAG 实例 trigger_dag_id .

GitHub 上的代码和示例: https://github.com/mastak/airflow_multi_dagrun
关于此代码的更多描述: https://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13

关于directed-acyclic-graphs - 我们如何使用 TriggerDagRunOperator 触发多个 Airflow dags?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44806944/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com