gpt4 book ai didi

带有自定义字段的 Airflow 自定义指标和/或结果对象

转载 作者:行者123 更新时间:2023-12-03 14:55:53 31 4
gpt4 key购买 nike

在通过 Airflow 运行 pySpark SQL 管道时,我有兴趣获取一些业务统计信息,例如:

  • 源读取计数
  • 目标写入计数
  • 处理过程中 DF 的大小
  • 错误记录数

  • 一种想法是将其直接推送到指标,因此它会被 Prometheus 等监控工具自动使用。另一个想法是通过一些 DAG 结果对象获取这些值,但我无法在文档中找到任何关于它的信息。

    如果您有解决方案,请至少发布一些伪代码。

    最佳答案

    我希望在 airflow.stats.Stats 中重用 Airflow 的统计和监控支持。类(class)。也许是这样的:

    import logging
    from airflow.stats import Stats

    PYSPARK_LOG_PREFIX = "airflow_pyspark"


    def your_python_operator(**context):
    [...]

    try:
    Stats.incr(f"{PYSPARK_LOG_PREFIX}_read_count", src_read_count)
    Stats.incr(f"{PYSPARK_LOG_PREFIX}_write_count", tgt_write_count)
    # So on and so forth
    except:
    logging.exception("Caught exception during statistics logging")

    [...]

    关于带有自定义字段的 Airflow 自定义指标和/或结果对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55900960/

    31 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com