gpt4 book ai didi

python - 如何控制 Airflow 安装的并行性或并发性?

转载 作者:行者123 更新时间:2023-12-04 13:15:32 26 4
gpt4 key购买 nike

在我的一些 Apache Airflow 安装中,即使调度程序似乎没有完全加载,计划运行的 DAG 或任务也不会运行。如何增加可并发运行的 DAG 或任务的数量?
同样,如果我的安装处于高负载下,并且我想限制我的 Airflow 工作人员拉取排队任务的速度(例如减少资源消耗),我可以调整什么来减少平均负载?

最佳答案

这是自 Airflow v1.10.2 以来可用的配置选项的扩展列表。有些可以在每个 DAG 或每个运算符(operator)的基础上设置,但在未指定时也可能回退到设置范围的默认值。

可以在每个 DAG 的基础上指定的选项 :

  • concurrency :允许在 DAG 的所有事件运行中同时运行的任务实例数。如果未设置,则默认为 core.dag_concurrency
  • max_active_runs:此 DAG 的最大事件运行次数。一旦达到此限制,调度程序将不会创建新的事件 DAG 运行。如果未设置,则默认为 core.max_active_runs_per_dag

  • 例子:
    # Only allow one run of this DAG to be running at any given time
    dag = DAG('my_dag_id', max_active_runs=1)

    # Allow a maximum of 10 tasks to be running across a max of 2 active DAG runs
    dag = DAG('example2', concurrency=10, max_active_runs=2)

    可以在每个运算符(operator)的基础上指定的选项 :
  • pool:在其中执行任务的池。Pools 可用于限制仅任务
  • 子集的并行性
  • task_concurrency:跨多个 DAG 运行的同一任务的并发限制

  • 例子:
    t1 = BaseOperator(pool='my_custom_pool', task_concurrency=12)

    在整个 Airflow 设置 中指定的选项 :
  • core.parallelism:在整个 Airflow 安装中运行的最大任务数
  • core.dag_concurrency:每个 DAG 可以运行的最大任务数(跨多个 DAG 运行)
  • core.non_pooled_task_slot_count:分配给不在池中运行的任务的任务槽数
  • core.max_active_runs_per_dag:最大事件 DAG 运行次数,每个 DAG
  • scheduler.max_threads:调度程序进程应该使用多少线程来调度 DAG
  • celery.worker_concurrency:如果使用 CeleryExecutor
  • ,一个工作人员一次将处理的最大任务实例数
  • celery.sync_parallelism:CeleryExecutor 用于同步任务状态的进程数
  • 关于python - 如何控制 Airflow 安装的并行性或并发性?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56370720/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com