gpt4 book ai didi

airflow - 如何重试完整的 Airflow DAG?

转载 作者:行者123 更新时间:2023-12-05 06:23:26 27 4
gpt4 key购买 nike

我知道可以重试单个任务,但是否可以重试完整的 DAG?

我动态创建任务,这就是为什么我需要重试的不是特定任务,而是完成 DAG。如果 Airflow 不支持它,也许有一些解决方法。

最佳答案

我编写了以下脚本并将其安排在 airflow master 上以重新运行“dag_ids_to_monitor”数组中提到的 DAG 的失败 DAG 运行

import subprocess
import re
from datetime import datetime

dag_ids_to_monitor = ['dag1','dag2','dag2']



def runBash(cmd):
print ("running bash command {}".format(cmd))
output = subprocess.check_output(cmd.split())
return output


def datetime_valid(dt_str):
try:
datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S')
print(dt_str)
print(datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S'))
except:
return False
return True


def get_schedules_to_rerun(dag_id):
bashCommand = f"airflow list_dag_runs --state failed {dag_id}"
output = runBash(bashCommand)

schedules_to_rerun = []
for line in output.split('\n'):
parts = re.split("\s*\|\s*", line)
if len(parts) > 4 and datetime_valid(parts[3][:-6]):
schedules_to_rerun.append(parts[3])
return schedules_to_rerun


def trigger_runs(dag_id, re_run_start_times):
for start_time in re_run_start_times:
runBash(f"airflow clear --no_confirm --start_date {start_time} --end_date {start_time} {dag_id}")


def rerun_failed_dag_runs(dag_id):
re_run_start_times = get_schedules_to_rerun(dag_id)
trigger_runs(dag_id,re_run_start_times)


for dag_id in dag_ids_to_monitor:
rerun_failed_dag_runs(dag_id)

关于airflow - 如何重试完整的 Airflow DAG?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58327891/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com