gpt4 book ai didi

celery - Airflow - 如何让工作人员完成所有 dag run 任务?

转载 作者:行者123 更新时间:2023-12-05 05:12:07 29 4
gpt4 key购买 nike

我目前正在使用 Airflow 和 Celery 处理文件。工作人员需要下载文件、处理它们并在之后重新上传它们。我的 DAG 只需要一名 worker 就可以了。但是当我添加一个时,事情就变得复杂了。

工作人员在有空时接受任务。 Worker1 可以承担“处理下载的文件”任务,但 Worker2 承担了“下载文件”任务,因此任务失败,因为它无法处理不存在的文件。

有没有办法向工作人员(或调度程序)指定 DAG 必须仅在一个工作人员上运行?我知道队列。但我已经在使用它们了。

最佳答案

在这种情况下,您可以使用 Airflow 变量来保存所有工作节点的名称。例如:

  • 变量:worker_list
  • 值:boxA, boxB, boxC

运行 Airflow worker 时,您可以指定多个作业队列。例如:airflow worker job_queue1,job_queue2对于你的情况,我将运行 airflow worker af_<hostname>

在你的 DAG 代码中,只需要获取那个 worker_list Airflow 变量,随机选择一个框,然后将你所有的作业排队到 af_<random_selected_box>排队

关于celery - Airflow - 如何让工作人员完成所有 dag run 任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54905657/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com