- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个每 5 分钟运行一次的 DAG(我们称之为 5_min_dag
),另一个 DAG 每天使用当天运行的一些 5_min_dag
的输出运行(我们称之为daily_dag
)。
如何确保 daily_dag
等待当天的 5_min_dag
运行完成?
一些简化的代码来说明问题:
# ./5_min_dag.py
5_min_dag = DAG('5_min_dag', schedule_interval=timedelta(minutes=5))
5_min_task = BashOperator(
task_id='5_min_task',
bash_command="echo date",
dag=5_min_dag
)
# ./daily_dag.py
daily_dag = Dag('daily_dag', schedule_interval=timedelta(days=1))
daily_average_task = BashOperator(
task_id='daily_average_task',
bash_command="~/make_daily_average.py",
dag=daily_dag
)
## pseudocode for what I am missing:
# for each 5_min_dag that is "today" (relative to {{execution_date}})
# set the 5_min_dag upstream of daily_average_task
这可能吗?
也许使用ExternalTaskSensor和/或SubDagOperator?
最佳答案
我发现解决此问题的更好方法是使用 SQLSensor
查询 Airflow 元数据数据库。
首先,一个connection需要设置数据库。我使用 Web UI 设置名为 mysql_default
的连接。
以下运算符被设置为 daily_dag
中的第一个任务。直到 daily_dag
的 execution_date
当天的所有 5_min_dag
都具有 status==success
后才会成功.
wait_for_5_min_dags = SqlSensor(
task_id='wait_for_all_5_min_dags',
conn_id='mysql_default',
sql="""
SELECT GREATEST(COUNT(state)-287, 0)
FROM dag_run WHERE
(execution_date BETWEEN
'{{execution_date.replace(hour=0,minute=0)}}' AND '{{execution_date.replace(hour=23,minute=59)}}')
AND dag_id='5_min_dag'
AND state='success';
"""
)
SQLSensor
仅当查询返回非空或非零结果时才会成功。因此,此查询将返回 0,直到我们恰好找到当天 288
次成功运行的 dag (24*60/5=288
)。如果我们等待每小时运行的 dag,我们将减去 23
,因为我们每天等待 24
dag。
关于airflow - 等待execution_date范围内的一组外部DAG,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47840770/
我想在触发器 DAG 中设置 execution_date。我正在使用操作符 TriggerDagRunOperator,这个操作符有参数 execution_date,我想设置当前的 executi
在常规的 python 代码中我可以这样做: import time int(time.time()) 这给了我作为纪元的时间。我希望能够使用 Airflow 宏执行此操作:execution_dat
给定一个 DAG 有一个 start_date,它在特定日期运行,相应的 DAGRun 的 execution_date 是怎样的 定义了吗? 我已阅读 documentation但是一个例子让我感到
这是我的代码: EXEC_TIMESTAMP = "{{ execution_date.strftime('%Y-%m-%d %H:%M') }}" query = """ se
在 Airflow 中,我想每周一上午 8 点运行一次 dag(execution_date 当然应该是“当天星期一上午 8 点”)。为此工作流程设置的相关参数是: 开始日期:“2018-03-19”
最近我对 Airflow 进行了太多测试,以至于 execution_date 有一个问题运行时 airflow trigger_dag . 我了解到 execution_date不是我们第一次从h
来自 cron 的 Airflow 的新手,试图了解 execution_date 宏如何应用于调度系统以及何时手动触发。我已经阅读了常见问题解答,并根据我预期的时间表设置了执行时间,并填写了正确的
在这个答案的帮助下 https://stackoverflow.com/a/41730510/4200352我正在执行一个 python 文件。 我使用 PythonOperator 并尝试将执行日期
假设我有一个 easteregg.py 文件: from airflow import DAG from dateutil import parser from datetime import tim
我是一名优秀的程序员,十分优秀!