gpt4 book ai didi

python - Airflow :为每个文件运行 DAG 的正确方法

转载 作者:行者123 更新时间:2023-12-03 16:22:18 26 4
gpt4 key购买 nike

我有以下任务要解决:

Files are being sent at irregular times through an endpoint and stored locally. I need to trigger a DAG run for each of these files. For each file the same tasks will be performed



总体流程如下:对于每个文件,运行任务 A->B->C->D

正在批量处理文件。虽然这项任务对我来说似乎微不足道,但我找到了几种方法来做到这一点,我很困惑哪一个是“合适的”(如果有的话)。

第一种模式:使用实验性 REST API 来触发 dag。

也就是说,公开一个 Web 服务,它接收请求和文件,将其存储到一个文件夹中,并使用 experimental REST api通过将 file_id 作为 conf 传递来触发 DAG

缺点 :REST apis 仍然是实验性的,不确定 Airflow 如何处理一次有许多请求的负载测试(这不应该发生,但是,如果发生了怎么办?)

第二种模式:2 dags。一种使用 TriggerDagOperator 感知和触发,一种处理。

始终使用与之前描述的相同的 ws,但这次它只是存储文件。然后我们有:
  • 第一个 dag:使用 FileSensor 和 TriggerDagOperator 来触发给定 N 个文件的 N 个 dag
  • 第二个目标:任务 A->B->C

  • 缺点 :需要避免将相同的文件发送到两个不同的 DAG 运行。
    例子:

    文件夹 x.json 中的文件
    传感器找到 x,触发 DAG (1)

    传感器返回并重新安排。如果 DAG (1) 未处理/移动文件,则传感器 DAG 可能会使用相同的文件重新安排新的 DAG 运行。这是不需要的。

    第三种模式:对于文件中的文件,任务 A->B->C

    this question 中所见.

    缺点 :这可以工作,但是我不喜欢的是 UI 可能会变得一团糟,因为每次 DAG 运行看起来都不一样,但它会随着正在处理的文件数量而变化。此外,如果有 1000 个文件要处理,运行可能会很难阅读

    第四种模式:使用 subdags

    我还不确定它们是如何完全工作的,正如我所看到的 they are not encouraged (最后),但是应该可以为每个文件生成一个 subdag 并让它运行。类似于 this question .

    缺点 :似乎 subdags 只能与顺序执行器一起使用。

    我是否遗漏了一些东西并过度思考了(在我看来)应该非常直接的东西?谢谢

    最佳答案

    我知道我迟到了,但我会选择第二种模式:“2 dags。一个使用 TriggerDagOperator 感知和触发,一个进程”,因为:

  • 每个文件都可以并行执行
  • 第一个 DAG 可以选择要处理的文件,重命名它(添加后缀“_processing”或将其移动到处理文件夹)
  • 如果我是贵公司的新开发人员,我打开工作流,我想了解工作流的逻辑是什么,而不是上次动态构建时处理了哪些文件
  • 如果 dag 2, 发现文件有问题,则将其重命名(使用“_error”后缀或将其移动到错误文件夹)
  • 这是一种无需创建任何额外运算符即可处理文件的标准方法
  • 它使 de DAG 具有幂等性且更易于测试。更多信息在此 article

  • 重命名和/或移动文件是在每个 ETL 中处理文件的非常标准的方法。

    对了,我一直推荐这篇文章 https://medium.com/bluecore-engineering/were-all-using-airflow-wrong-and-how-to-fix-it-a56f14cb0753 .它没有

    关于python - Airflow :为每个文件运行 DAG 的正确方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60082546/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com