gpt4 book ai didi

airflow - 在 Google Composer 中删除 DAG - Airflow UI

转载 作者:行者123 更新时间:2023-12-02 16:13:22 26 4
gpt4 key购买 nike

我想从 Airflow UI 中删除 DAG,该 DAG 在 GCS/dags 文件夹中不再可用。我知道 Airflow 有一种"new"方法可以使用以下方法从数据库中删除 dagsairflow delete_dag my_dag_id 命令,见 https://stackoverflow.com/a/49683543/5318634

似乎在 Composer Airflow 版本中尚不支持 delete_dag 命令。

不要尝试这个:我也尝试过使用airflow Resetdb,但airflow UI死掉了

有没有办法删除当前不在 gs://BUCKET/dags/ 文件夹中的 dags?

最佳答案

我创建了一个 DAG 来清理 UI。它从 Airflow 变量中读取afDagID

from airflow import DAG
from airflow import models
from airflow.operators.mysql_operator import MySqlOperator
import logging
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator

dag = DAG('ManageAirFlow', description='Deletes Airflow DAGs from backend: Uses vars- afDagID',
schedule_interval=None,
start_date=datetime(2018, 3, 20), catchup=False)

DeleteXComOperator = MySqlOperator(
task_id='delete-xcom-record-task',
mysql_conn_id='airflow_db',
sql="DELETE from xcom where dag_id='{}'".format(models.Variable.get('afDagID')),
dag=dag)

DeleteTaskOperator = MySqlOperator(
task_id='delete-task-record-task',
mysql_conn_id='airflow_db',
sql="DELETE from task_instance where dag_id='{}'".format(models.Variable.get('afDagID')),
dag=dag)

DeleteSLAMissOperator = MySqlOperator(
task_id='delete-sla-record-task',
mysql_conn_id='airflow_db',
sql="DELETE from sla_miss where dag_id='{}'".format(models.Variable.get('afDagID')),
dag=dag)

DeleteLogOperator = MySqlOperator(
task_id='delete-log-record-task',
mysql_conn_id='airflow_db',
sql="DELETE from log where dag_id='{}'".format(models.Variable.get('afDagID')),
dag=dag)

DeleteJobOperator = MySqlOperator(
task_id='delete-job-record-task',
mysql_conn_id='airflow_db',
sql="DELETE from job where dag_id='{}'".format(models.Variable.get('afDagID')),
dag=dag)

DeleteDagRunOperator = MySqlOperator(
task_id='delete-dag_run-record-task',
mysql_conn_id='airflow_db',
sql="DELETE from dag_run where dag_id='{}'".format(models.Variable.get('afDagID')),
dag=dag)

DeleteDagOperator = MySqlOperator(
task_id='delete-dag-record-task',
mysql_conn_id='airflow_db',
sql="DELETE from dag where dag_id='{}'".format(models.Variable.get('afDagID')),
dag=dag)



DeleteXComOperator >> DeleteTaskOperator >> DeleteSLAMissOperator >> DeleteLogOperator >> DeleteJobOperator >> DeleteDagRunOperator >> DeleteDagOperator

关于airflow - 在 Google Composer 中删除 DAG - Airflow UI,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50632623/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com