gpt4 book ai didi

call - 使用 TriggerDagRunOperator 多次运行另一个 DAG

转载 作者:行者123 更新时间:2023-12-04 07:49:37 24 4
gpt4 key购买 nike

我有一个 DAG (DAG1),我在其中复制了一堆文件。然后,我想为每个复制的文件启动另一个 DAG (DAG2)。由于每次 DAG1 运行复制的文件数量会有所不同,我想基本上循环文件并使用适当的参数调用 DAG2。

例如:

with DAG( 'DAG1',
description="copy files over",
schedule_interval="* * * * *",
max_active_runs=1
) as dag:


t_rsync = RsyncOperator( task_id='rsync_data',
source='/source/',
target='/destination/' )

t_trigger_preprocessing = TriggerDagRunOperator( task_id='trigger_preprocessing',
trigger_daq_id='DAG2',
python_callable=trigger

)

t_rsync >> t_trigger_preprocessing

我希望使用 python_callable triggert_rsync 中提取相关的 xcom 数据然后触发DAG2;但我不清楚如何做到这一点。

我更愿意将调用 DAG2 的逻辑放在这里以简化 DAG2 的内容(并且还提供带有 max_active_runs 的堆叠示意图)

最佳答案

最终编写了我自己的运算符:

class TriggerMultipleDagRunOperator(TriggerDagRunOperator):
def execute(self, context):
count = 0
for dro in self.python_callable(context):
if dro:
with create_session() as session:
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
conf=dro.payload,
external_trigger=True)
session.add(dr)
session.commit()
count = count + 1
else:
self.log.info("Criteria not met, moving on")
if count == 0:
raise AirflowSkipException('No external dags triggered')

与 python_callable 类似
def trigger_preprocessing(context):
for base_filename,_ in found.items():
exp = context['ti'].xcom_pull( task_ids='parse_config', key='experiment')
run_id='%s__%s' % (exp['microscope'], datetime.utcnow().replace(microsecond=0).isoformat())
dro = DagRunOrder(run_id=run_id)
d = {
'directory': context['ti'].xcom_pull( task_ids='parse_config', key='experiment_directory'),
'base': base_filename,
'experiment': exp['name'],
}
LOG.info('triggering dag %s with %s' % (run_id,d))
dro.payload = d
yield dro
return

然后将它们与以下内容捆绑在一起:
t_trigger_preprocessing = TriggerMultipleDagRunOperator( task_id='trigger_preprocessing',
trigger_dag_id='preprocessing',
python_callable=trigger_preprocessing
)

关于call - 使用 TriggerDagRunOperator 多次运行另一个 DAG,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47103160/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com