- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
最近,我将 Airflow 从 1.9 升级到 1.10.3(最新版本)。
但是我确实注意到与 SubDag 并发相关的性能问题。 SubDag 内只能拾取 1 个任务,这不是应该的方式,我们为 SubDag 设置的并发数是 8。
请参阅以下内容:get_monthly_summary-214
和 get_monthly_summary-215
是两个 SubDags,它可以通过父 dag 并发在并行 Controller 中运行
但是当放大 SubDag 时,输入 get_monthly_summary-214
,然后 你绝对可以看到,一次只有 1 个任务在运行,其他任务都在排队,并且以这种方式一直运行。当我们检查 SubDag 并发数时,它实际上是我们在代码中指定的 8:
我们确实设置了池槽大小,它是 32,我们有 8 个 celery 工作线程来接收排队的任务,并且我们与并发相关的 Airflow 配置如下:
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor
# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
worker_concurrency = 16
此外,所有 SubDag 均使用名为 mini
的队列进行配置,而其所有内部任务都是名为 default
的默认队列,因为我们可能会使用一些 deadlock problems如果我们在同一个队列上运行 SubDag 运算符和 SubDag 内部任务之前。我还尝试对所有任务和运算符使用default
队列,但没有帮助。
旧版本1.9似乎很好,每个SubDag可以并行执行多个任务,我们错过了什么吗?
最佳答案
根据上面发布的@kaxil 的发现,如果您仍然想并行执行子dag 内的任务,一个解决方案是创建一个包装函数来显式传递 executor
当构建SubDagOperator
:
from airflow.operators.subdag_operator import SubDagOperator
from airflow.executors import GetDefaultExecutor
def sub_dag_operator_with_default_executor(subdag, *args, **kwargs):
return SubDagOperator(subdag=subdag, executor=GetDefaultExecutor(), *args, **kwargs)
调用sub_dag_operator_with_default_executor
当您创建 subdag 运算符时。为了解除sub dag操作符performance concerns
We should change the default executor for subdag_operator to SequentialExecutor. Airflow pool is not honored by subdagoperator, hence it could consume all the worker resources(e.g in celeryExecutor). This causes issues mentioned in airflow-74 and limits the subdag_operator usage. We use subdag_operator in production by specifying using sequential executor.
我们建议创建一个特殊的队列(在我们的例子中指定queue='mini')和celeryworker来处理subdag_operator,这样它就不会消耗所有普通celeryworker的资源。如下:
dag = DAG(
dag_id=DAG_NAME,
description=f"{DAG_NAME}-{__version__}",
...
)
with dag:
ur_operator = sub_dag_operator_with_default_executor(
task_id=f"your_task_id",
subdag=load_sub_dag(
parent_dag_name=DAG_NAME,
child_dag_name=f"your_child_dag_name",
args=args,
concurrency=dag_config.get("concurrency_in_sub_dag") or DEFAULT_CONCURRENCY,
),
queue="mini",
dag=dag
)
然后,当您创建特殊的 celery Worker 时(我们使用的是轻量级主机,如 2 核和 3G 内存),请指定 AIRFLOW__CELERY__DEFAULT_QUEUE
如mini
,取决于您希望并行运行多少个 sub dag 运算符,您应该创建多个特殊的 celery 工作人员来平衡资源的负载,我们建议,每个特殊的 celery 工作人员一次最多应处理 2 个子 dag 运算符,或者它将被耗尽(例如,在 2 核和 3G 内存主机上耗尽内存)
您也可以调整concurrency
通过 ENV VAR concurrency_in_sub_dag
在你的 subdag 中在 Airflow UI Variables
中创建配置页面。
更新[2020年5月22日]以上仅适用于 Airflow (<=1.10.3,>=1.10.0)对于 1.10.3 之前的 Airflow ,请使用
from airflow.executors import get_default_executor
相反。
关于Airflow 1.10.3 SubDag 即使并发数为 8,也只能并行运行 1 个任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56051276/
Task.WaitAll 方法等待所有任务,Task.WaitAny 方法等待一个任务。如何等待任意N个任务? 用例:下载搜索结果页面,每个结果都需要一个单独的任务来下载和处理。如果我使用 WaitA
我正在查看一些像这样的遗留 C# 代码: await Task.Run(() => { _logger.LogException(LogLevel.Error, mes
如何在 Linux 中运行 cron 任务? 关注此Q&A ,我有这个 cron 任务要运行 - 只是将一些信息写入 txt 文件, // /var/www/cron.php $myfile = fo
原谅我的新手问题,但我想按顺序执行三个任务并在剧本中使用两个角色: 任务 角色 任务 角色 任务 这是我到目前为止(任务,角色,任务): --- - name: Task Role Task ho
我有一个依赖于 installDist 的自定义任务 - 不仅用于执行,还依赖于 installDist 输出: project.task('run', type: JavaExec, depends
从使用 Wix 创建的 MSI 运行卸载时,我需要在尝试删除任何文件之前强行终止在后台运行的进程。主要应用程序由一个托盘图标组成,它反射(reflect)了 bg 进程监控本地 Windows 服务的
我想编写 Ant 任务来自动执行启动服务器的任务,然后使用我的应用程序的 URL 打开 Internet Explorer。 显然我必须执行 startServer先任务,然后 startApplic
使用 ASP.NET 4.5,我正在尝试使用新的 async/await 玩具。我有一个 IDataReader 实现类,它包装了一个特定于供应商的阅读器(如 SqlDatareader)。我有一个简
使用命令 gradle tasks可以得到一份所有可用任务的报告。有什么方法可以向此命令添加参数并按任务组过滤任务。 我想发出类似 gradle tasks group:Demo 的命令筛选所有任务并
除了sshexec,还有什么办法吗?任务要做到这一点?我知道您可以使用 scp 复制文件任务。但是,我需要执行其他操作,例如检查是否存在某些文件夹,然后将其删除。我想使用类似 condition 的东
假设我有字符串 - "D:\ApEx_Schema\Functions\new.sql@@\main\ONEVIEW_Integration\3" 我需要将以下内容提取到 diff 变量中 - 文档名
我需要编写一个 ant 任务来确定某个文件是否是只读的,如果是,则失败。我想避免使用自定义选择器来为我们的构建系统的性质做这件事。任何人都有任何想法如何去做?我正在使用 ant 1.8 + ant-c
这是一个相当普遍的计算机科学问题,并不特定于任何操作系统或框架。 因此,我对与在线程池上切换任务相关的开销感到有些困惑。在许多情况下,给每个作业分配自己的特定线程是没有意义的(我们不想创建太多硬件线程
我正在使用以下 Ansible playbook 一次性关闭远程 Ubuntu 主机列表: - hosts: my_hosts become: yes remote_user: my_user
如何更改 Ant 中的当前工作目录? Ant documentation没有 任务,在我看来,最好的做法是不要更改当前工作目录。 但让我们假设我们仍然想这样做——你会如何做到这一点?谢谢! 最佳答案
是否可以运行 cronjob每三天一次?或者也许每月 10 次。 最佳答案 每三天运行一次 - 或更短时间在月底运行一次。 (如果上个月有 31 天,它将连续运行 2 天。) 0 0 */3 * *
如何在 Gradle 任务中执行托管在存储库中的工具? 在我的具体情况下,我正在使用 Gradle 构建一个 Android 应用程序。我添加了一项任务,将一些 protobuf 数据从文本编码为二进
我的项目有下一个结构: Root |- A |- C (depends on A) \- B (depends on A) 对于所有子项目,我们使用自己的插件生成资源:https://githu
我设置了一个具有4个节点的Hadoop群集,其中一个充当HDFS的NameNode以及Yarn主节点。该节点也是最强大的。 现在,我分发了2个文本文件,一个在node01(名称节点)上,一个在node
在 TFS 2010 中为多个用户存储任务的最佳方式是什么?我只能为一项任务分配一个。 (例如:当我计划向所有开发人员演示时) (这是一个 Scrum Msf 敏捷项目,其中任务是用户故事的一部分)
我是一名优秀的程序员,十分优秀!