gpt4 book ai didi

airflow - 如何为Apache Airflow DAG定义超时?

转载 作者:行者123 更新时间:2023-12-03 16:43:00 42 4
gpt4 key购买 nike

我使用的是airflow 1.10.2,但Airflow似乎忽略了我为DAG设置的超时。

我正在使用dagrun_timeout参数设置DAG的超时时间(例如20秒),并且我有一个需要2分钟才能运行的任务,但是 Airflow 将DAG标记为成功!

args = {
'owner': 'me',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True
}

dag = DAG('test_timeout',
schedule_interval=None,
default_args=args,
dagrun_timeout=timedelta(seconds=20))

def this_passes(**kwargs):
return

def this_passes_with_delay(**kwargs):
time.sleep(120)
return

would_succeed = PythonOperator(task_id='would_succeed',
dag=dag,
python_callable=this_passes,
email=to)

would_succeed_with_delay = PythonOperator(task_id='would_succeed_with_delay',
dag=dag,
python_callable=this_passes_with_delay,
email=to)

would_succeed >> would_succeed_with_delay

不会引发任何错误消息。我使用了不正确的参数吗?

最佳答案

source code中所述:

:param dagrun_timeout: specify how long a DagRun should be up before
timing out / failing, so that new DagRuns can be created. The timeout
is only enforced for scheduled DagRuns, and only once the
# of active DagRuns == max_active_runs.

因此,在设置 schedule_interval=None时,这可能是预期的行为。这里的想法是确保计划的DAG不会永远持续下去并阻止后续的运行实例。

现在,您可能对所有运算符中可用的 execution_timeout 感兴趣。
例如,您可以像这样在 PythonOperator上设置60s超时:

would_succeed_with_delay = PythonOperator(task_id='would_succeed_with_delay',
dag=dag,
execution_timeout=timedelta(seconds=60),
python_callable=this_passes_with_delay,
email=to)

关于airflow - 如何为Apache Airflow DAG定义超时?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57110885/

42 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com