gpt4 book ai didi

airflow - 如何从不同的 dag 中获取所有失败任务的列表

转载 作者:行者123 更新时间:2023-12-04 15:20:32 25 4
gpt4 key购买 nike

我们有以下命令,我们可以清除失败的任务并在一次尝试中重新运行它们

airflow clear [-s START_DATE] [-e END_DATE] --only_failed dag_id
有什么方法可以从所有dag中获取所有失败任务的信息并将其导出到文件(excel或文本)中

最佳答案

这是一个未经测试的代码片段,应该可以帮助您

  • 获取失败列表TaskInstance s(您可以修改它以添加过滤器,例如 dag_idstart_date )
     from typing import List, Optional
    from airflow.models.taskinstance import TaskInstance
    from airflow.utils import State
    from airflow.settings import Session
    from airflow.utils.db import provide_session

    @provide_session
    def get_failed_task_instances(session: Optional[Session] = None) -> List[TaskInstance]:
    """
    Returns list of failed TaskInstance(s)
    - for all DAGs since inception of time
    - sorted by (1) dag_id ASC (2) start_date DESC
    :param session: Optional[Session]
    :return: List[TaskInstance]
    """
    failed_task_instances: List[TaskInstance] = session.query(TaskInstance). \
    filter(TaskInstance.state == State.FAILED). \
    order_by(TaskInstance.dag_id.asc(), TaskInstance.start_date.desc()). \
    all()
    return failed_task_instances
  • (实用函数)从 TaskInstance 中提取相关位喜欢 dag_id , start_date & task_id (根据您的需要更改)
     def ti_to_string(ti: TaskInstance) -> List[str]:
    """
    Converts a TaskInstance in List[str] by extracting relevant bits of info from it
    :param ti: TaskInstance
    :return: List[str]
    """
    return [ti.dag_id, ti.start_date, ti.task_id]
  • 将它们放在一起:将数据写入输出 CSV 文件
     import csv
    def write_failed_task_instances_to_csv(output_file_path: str) -> None:
    """
    Writes list of failed tasks in the provided output CSV filepath
    :param output_file_path:
    :return: None
    """
    # prepare list of failed TaskInstance(s)
    failed_task_instances: List[TaskInstance] = get_failed_task_instances()
    # extract relevant bits of info from TaskInstance(s) list (to serialize them)
    failed_task_instances_data: List[List[str]] = list(map(ti_to_string, failed_task_instances))
    # write data of failed TaskInstance(s) to output CSV filepath
    with open(output_file_path, "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerows(failed_task_instances_data)

  • 引用
  • views.py :这个文件是发现 SQLAlchemy 的好去处 Airflow 魔法
  • SQLAlchemy multiple order_bys
  • Writing a Python list of list to CSV
  • 关于airflow - 如何从不同的 dag 中获取所有失败任务的列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63428936/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com