gpt4 book ai didi

python - 并行处理 Airflow 上的百万个文件列表

转载 作者:行者123 更新时间:2023-12-03 23:52:16 25 4
gpt4 key购买 nike

我有一个 ETL 管道,其中包含以下任务:

  • 抓取网站以收集 csv 文件的 URL
  • 从步骤 1
  • 的 url 下载所有文件
  • 处理在步骤 2
  • 中下载的每个文件

    我对这些步骤中的每一个都有一个 python 函数,我们称它们为 {f1,f2,f3}
    multiprocessing模块和 Pool.map打电话,我可以开火 n processes两个 f2, f3
    我正在将整个脚本转换为 Airflow DAG。我的问题是假设,百万个文件的规模,我特别想在第 3 步中使用我的整个 Celery worker 集群来并行处理文件。

    例如,假设我有 100 个 celery 工作节点,每个节点有 2 个内核,这给了我 200 个内核——我想并行处理至少 100 个文件。

    我该怎么做呢?

    最佳答案

    Airflow 可用于数以千计的动态任务,但不应该。 Airflow DAG 应该是相当稳定的。例如,您仍然可以使用 Airflow 来处理所有抓取的数据,并在以后的 ETL 流程中使用这些信息。

    大量的动态任务会导致 DAG 像这样运行:

    enter image description here

    这会导致 GUI 和日志文件中出现许多垃圾信息。

    我建议在 Celery library 之上构建你的任务系统(不要弄乱 Airflow 中的 CeleryExecutor,因为 Airflow 可以在 Celery 上使用)。它是一个专注于数百万实时任务的任务队列:

    Celery is used in production systems to process millions of tasks a day.



    Celery 是用 Python 编写的,可用于生产,稳定且可扩展。我认为这是解决您的问题的最佳工具。

    但是如果你真的只想使用 Airflow,你可以阅读 this文章(关于动态 DAG 生成)和 this文章(关于 DAG 内的动态任务生成)。

    关于python - 并行处理 Airflow 上的百万个文件列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55829649/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com