gpt4 book ai didi

airflow - Apache Airflow 不强制执行 dagrun_timeout

转载 作者:行者123 更新时间:2023-12-03 20:57:19 32 4
gpt4 key购买 nike

我将 Apache Airflow 版本 1.10.3 与顺序执行器一起使用,如果 DAG 尚未完成,我希望 DAG 在一段时间后失败。我试过设置 dagrun_timeout在示例代码中

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
'owner': 'me',
'depends_on_past': False,
'start_date': datetime(2019, 6, 1),
'retries': 0,
}

dag = DAG('min_timeout', default_args=default_args, schedule_interval=timedelta(minutes=5), dagrun_timeout = timedelta(seconds=30), max_active_runs=1)

t1 = BashOperator(
task_id='fast_task',
bash_command='date',
dag=dag)

t2 = BashOperator(
task_id='slow_task',
bash_command='sleep 45',
dag=dag)

t2.set_upstream(t1)
slow_task单独花费的时间超过了 dagrun_timeout 设置的时间限制,所以我的理解是 Airflow 应该停止 DAG 执行。但是,这不会发生,并且允许 slow_task 在其整个持续时间内运行。发生这种情况后,运行将被标记为失败,但这不会根据需要终止任务或 DAG。使用 execution_timeoutslow_task确实会导致任务在指定的时间限制内被终止,但我更愿意使用 DAG 的总体时间限制而不是指定 execution_timeout对于每个任务。
还有什么我应该尝试实现这种行为的,或者我可以修复的任何错误?

最佳答案

Airflow 调度器 runs a loop至少每个 SCHEDULER_HEARTBEAT_SEC (默认值为 5 seconds )。
牢记at least在这里,因为调度程序执行了一些可能会延迟其循环的下一个周期的操作。
这些行动包括:

  • 解析 dag
  • 填写 DagBag
  • 检查 DagRun 并更新它们的状态
  • 安排下一个 DagRun

  • 在您的示例中,延迟任务不会在 dagrun_timeout 处终止。因为调度程序在任务完成后执行它的下一个周期。

    关于airflow - Apache Airflow 不强制执行 dagrun_timeout,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60330553/

    32 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com