gpt4 book ai didi

airflow - 等待execution_date范围内的一组外部DAG

转载 作者:行者123 更新时间:2023-12-01 18:44:13 25 4
gpt4 key购买 nike

我有一个每 5 分钟运行一次的 DAG(我们称之为 5_min_dag),另一个 DAG 每天使用当天运行的一些 5_min_dag 的输出运行(我们称之为daily_dag)。

如何确保 daily_dag 等待当天的 5_min_dag 运行完成?

一些简化的代码来说明问题:


# ./5_min_dag.py
5_min_dag = DAG('5_min_dag', schedule_interval=timedelta(minutes=5))
5_min_task = BashOperator(
task_id='5_min_task',
bash_command="echo date",
dag=5_min_dag
)

# ./daily_dag.py
daily_dag = Dag('daily_dag', schedule_interval=timedelta(days=1))

daily_average_task = BashOperator(
task_id='daily_average_task',
bash_command="~/make_daily_average.py",
dag=daily_dag
)

## pseudocode for what I am missing:
# for each 5_min_dag that is "today" (relative to {{execution_date}})
# set the 5_min_dag upstream of daily_average_task

这可能吗?

也许使用ExternalTask​​Sensor和/或SubDagOperator?

最佳答案

我发现解决此问题的更好方法是使用 SQLSensor 查询 Airflow 元数据数据库。

首先,一个connection需要设置数据库。我使用 Web UI 设置名为 mysql_default 的连接。

以下运算符被设置为 daily_dag 中的第一个任务。直到 daily_dagexecution_date 当天的所有 5_min_dag 都具有 status==success 后才会成功.

    wait_for_5_min_dags = SqlSensor(
task_id='wait_for_all_5_min_dags',
conn_id='mysql_default',
sql="""
SELECT GREATEST(COUNT(state)-287, 0)
FROM dag_run WHERE
(execution_date BETWEEN
'{{execution_date.replace(hour=0,minute=0)}}' AND '{{execution_date.replace(hour=23,minute=59)}}')
AND dag_id='5_min_dag'
AND state='success';
"""
)

SQLSensor 仅当查询返回非空或非零结果时才会成功。因此,此查询将返回 0,直到我们恰好找到当天 288 次成功运行的 dag (24*60/5=288)。如果我们等待每小时运行的 dag,我们将减去 23,因为我们每天等待 24 dag。

关于airflow - 等待execution_date范围内的一组外部DAG,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47840770/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com