gpt4 book ai didi

python - 并行运行 Airflow 任务/dags

转载 作者:太空狗 更新时间:2023-10-29 21:44:58 32 4
gpt4 key购买 nike

我正在使用 airflow 编排一些 python 脚本。我有一个“主”dag,从中运行了几个 subdags。我的主要 dag 应该根据以下概述运行:

enter image description here

我已经通过使用以下几行在我的主 dag 中找到了这个结构:

etl_internal_sub_dag1 >> etl_internal_sub_dag2 >> etl_internal_sub_dag3
etl_internal_sub_dag3 >> etl_adzuna_sub_dag
etl_internal_sub_dag3 >> etl_adwords_sub_dag
etl_internal_sub_dag3 >> etl_facebook_sub_dag
etl_internal_sub_dag3 >> etl_pagespeed_sub_dag

etl_adzuna_sub_dag >> etl_combine_sub_dag
etl_adwords_sub_dag >> etl_combine_sub_dag
etl_facebook_sub_dag >> etl_combine_sub_dag
etl_pagespeed_sub_dag >> etl_combine_sub_dag

我想要 Airflow 做的是首先运行 etl_internal_sub_dag1 然后是 etl_internal_sub_dag2 然后是 etl_internal_sub_dag3。当 etl_internal_sub_dag3 完成后,我想要 etl_adzuna_sub_dagetl_adwords_sub_dagetl_facebook_sub_dagetl_pagespeed_sub_dag并行运行。最后,当这最后四个脚本完成后,我希望 etl_combine_sub_dag 运行。

但是,当我运行主 dag 时,etl_adzuna_sub_dagetl_adwords_sub_dagetl_facebook_sub_dagetl_pagespeed_sub_dag 都在运行一个接一个,不并行。

问题:如何确保脚本 etl_adzuna_sub_dagetl_adwords_sub_dagetl_facebook_sub_dagetl_pagespeed_sub_dag 是并行运行的吗?

编辑:我的default_argsDAG 如下所示:

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': start_date,
'end_date': end_date,
'email': ['myname@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}

DAG_NAME = 'main_dag'

dag = DAG(DAG_NAME, default_args=default_args, catchup = False)

最佳答案

您将需要使用 LocalExecutor

检查您的配置 (airflow.cfg),您可能正在使用连续执行任务的 SequentialExectuor

Airflow 使用后端数据库来存储元数据。检查您的 airflow.cfg 文件并查找 executor 关键字。默认情况下,Airflow 使用 SequentialExecutor 无论如何都会按顺序执行任务。因此,要允许 Airflow 并行运行任务,您需要在 Postges 或 MySQL 中创建一个数据库,并在 airflow.cfg (sql_alchemy_conn param)中对其进行配置,然后更改您的执行程序到 airflow.cfg 中的 LocalExecutor 然后运行 ​​airflow initdb

请注意,要使用 LocalExecutor,您需要使用 Postgres 或 MySQL 而不是 SQLite 作为后端数据库。

更多信息:https://airflow.incubator.apache.org/howto/initialize-database.html

If you want to take a real test drive of Airflow, you should consider setting up a real database backend and switching to the LocalExecutor. As Airflow was built to interact with its metadata using the great SqlAlchemy library, you should be able to use any database backend supported as a SqlAlchemy backend. We recommend using MySQL or Postgres.

关于python - 并行运行 Airflow 任务/dags,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52741536/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com