gpt4 book ai didi

queue - 以用户为中心的工作流程的 Airflow DAG 设计

转载 作者:行者123 更新时间:2023-12-02 02:47:33 25 4
gpt4 key购买 nike

我们正在考虑使用 Airflow 来取代我们目前基于自定义 rq 的工作流程,但我不确定设计它的最佳方式。或者如果使用 Airflow 甚至有意义。用例是:

  1. 我们收到用户上传的数据。
  2. 根据收到的数据类型,我们可选择运行零个或多个作业
  3. 如果收到特定的数据类型组合,每个作业都会运行。它为该用户运行的时间范围由收到的数据决定
  4. 作业从数据库中读取数据并将结果写入数据库。
  5. 这些工作可能会触发更多工作。

例如

数据上传后,我们将一个项目放入队列中:

上传:

user: 'a'
data:
- type: datatype1
start: 1
end: 3
- type: datatype2
start: 2
end: 3

这会触发:

  • job1,用户“a”,开始:1,结束:3
  • job2,用户“a”,开始:2,结束:3

然后 job1 可能会有一些清理作业在它之后运行。(如果能够将作业限制为仅在没有为同一用户运行其他作业时运行,那将是一件好事。)

我考虑过的方法:

1.

当数据上传到达消息队列时触发 DAG。

然后此 DAG 确定要运行哪些依赖作业,并将用户和时间范围作为参数(或 xcom)传递。

2.

当数据上传到达消息队列时触发 DAG。

然后,此 DAG 根据用户和时间范围内的数据类型和模板为作业动态创建 DAGS。

因此您可以获得每个用户、工作、时间范围组合的动态 DAG。


我什至不确定如何从消息队列中触发 DAG……并且发现很难找到与此用例类似的示例。也许那是因为 Airflow 不适合?

任何帮助/想法/建议将不胜感激。

谢谢。

最佳答案

Airflow 是围绕基于时间的时间表构建的。它不是为了根据数据的着陆来触发运行而构建的。有其他系统设计用于执行此操作。我听说过 pachyderm.io 或 dvs.org 之类的东西。甚至重新利用 CI 工具或自定义 Jenkins 设置都可以根据文件更改事件或消息队列触发。

但是您可以尝试通过使用外部队列监听器来使用 Airflow rest API calls to Airflow触发 DAG。例如,如果队列是 AWS SNS 队列,您可以使用简单的 Python 中的 AWS Lambda 监听器执行此操作。

我建议每个作业类型(或者它是用户,以较小者为准)一个 DAG,触发器逻辑根据队列确定它是正确的。如果有常见的清理任务等,DAG 可能会使用 TriggerDagRunOperator启动这些,或者您可能只有一个公共(public)库,其中包含每个 DAG 包含的那些清理任务。我认为后者更清洁。

DAG 可以将其任务限制在某些池中。您可以为每个用户创建一个池,以限制每个用户的作业运行。或者,如果每个用户有一个 DAG,则可以将该 DAG 的最大并发 DAG 运行设置为合理的值。

关于queue - 以用户为中心的工作流程的 Airflow DAG 设计,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53744711/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com