gpt4 book ai didi

Airflow 1.10.3 SubDag 即使并发数为 8,也只能并行运行 1 个任务

转载 作者:行者123 更新时间:2023-12-02 21:25:20 25 4
gpt4 key购买 nike

最近,我将 Airflow 从 1.9 升级到 1.10.3(最新版本)。

但是我确实注意到与 SubDag 并发相关的性能问题。 SubDag 内只能拾取 1 个任务,这不是应该的方式,我们为 SubDag 设置的并发数是 8。

请参阅以下内容:get_monthly_summary-214get_monthly_summary-215 是两个 SubDags,它可以通过父 dag 并发在并行 Controller 中运行

enter image description here

但是当放大 SubDag 时,输入 get_monthly_summary-214,然后 enter image description here你绝对可以看到,一次只有 1 个任务在运行,其他任务都在排队,并且以这种方式一直运行。当我们检查 SubDag 并发数时,它实际上是我们在代码中指定的 8: enter image description here

我们确实设置了池槽大小,它是 32,我们有 8 个 celery 工作线程来接收排队的任务,并且我们与并发相关的 Airflow 配置如下:

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
worker_concurrency = 16

此外,所有 SubDag 均使用名为 mini 的队列进行配置,而其所有内部任务都是名为 default 的默认队列,因为我们可能会使用一些 deadlock problems如果我们在同一个队列上运行 SubDag 运算符和 SubDag 内部任务之前。我还尝试对所有任务和运算符使用default队列,但没有帮助。

旧版本1.9似乎很好,每个SubDag可以并行执行多个任务,我们错过了什么吗?

最佳答案

根据上面发布的@kaxil 的发现,如果您仍然想并行执行子dag 内的任务,一个解决方案是创建一个包装函数来显式传递 executor当构建SubDagOperator :

from airflow.operators.subdag_operator import SubDagOperator
from airflow.executors import GetDefaultExecutor

def sub_dag_operator_with_default_executor(subdag, *args, **kwargs):
return SubDagOperator(subdag=subdag, executor=GetDefaultExecutor(), *args, **kwargs)

调用sub_dag_operator_with_default_executor当您创建 subdag 运算符时。为了解除sub dag操作符performance concerns

We should change the default executor for subdag_operator to SequentialExecutor. Airflow pool is not honored by subdagoperator, hence it could consume all the worker resources(e.g in celeryExecutor). This causes issues mentioned in airflow-74 and limits the subdag_operator usage. We use subdag_operator in production by specifying using sequential executor.

我们建议创建一个特殊的队列(在我们的例子中指定queue='mini')和celeryworker来处理subdag_operator,这样它就不会消耗所有普通celeryworker的资源。如下:

 dag = DAG(
dag_id=DAG_NAME,
description=f"{DAG_NAME}-{__version__}",
...
)
with dag:
ur_operator = sub_dag_operator_with_default_executor(
task_id=f"your_task_id",
subdag=load_sub_dag(
parent_dag_name=DAG_NAME,
child_dag_name=f"your_child_dag_name",
args=args,
concurrency=dag_config.get("concurrency_in_sub_dag") or DEFAULT_CONCURRENCY,
),
queue="mini",
dag=dag
)

然后,当您创建特殊的 celery Worker 时(我们使用的是轻量级主机,如 2 核和 3G 内存),请指定 AIRFLOW__CELERY__DEFAULT_QUEUEmini ,取决于您希望并行运行多少个 sub dag 运算符,您应该创建多个特殊的 celery 工作人员来平衡资源的负载,我们建议,每个特殊的 celery 工作人员一次最多应处理 2 个子 dag 运算符,或者它将被耗尽(例如,在 2 核和 3G 内存主机上耗尽内存)

您也可以调整concurrency通过 ENV VAR concurrency_in_sub_dag 在你的 subdag 中在 Airflow UI Variables 中创建配置页面。

更新[2020年5月22日]以上仅适用于 Airflow (<=1.10.3,>=1.10.0)对于 1.10.3 之前的 Airflow ,请使用

from airflow.executors import get_default_executor

相反。

关于Airflow 1.10.3 SubDag 即使并发数为 8,也只能并行运行 1 个任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56051276/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com