gpt4 book ai didi

python - 如何在另一个 dag apache Airflow 中创建 dags

转载 作者:行者123 更新时间:2023-12-03 08:47:32 32 4
gpt4 key购买 nike

我正在尝试拥有一个主 dag,它将根据我的需要创建更多 dags。我在 airflow.cfgdags_folder 中有以下 python 文件。此代码在数据库中创建主 dag。该主 dag 应该读取文本文件,并为文本文件中的每一行创建 dag。但在主 dag 内创建的 dags 不会添加到数据库中。正确的创建方法是什么?

版本详细信息:

Python版本:3.7

Apache Airflow 版本:1.10.8

import datetime as dt

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

root_dir = "/home/user/TestSpace/airflow_check/res"

print("\n\n ===> \n Dag generator")

default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2020, 3, 22, 00, 00, 00),
'concurrency': 1,
'retries': 0
}


def greet(_name):
message = "Greetings {} at UTC: {} Local: {}\n".format(_name, dt.datetime.utcnow(), dt.datetime.now())
f = open("{}/greetings.txt".format(root_dir), "a+")
print("\n\n =====> {}\n\n".format(message))
f.write(message)
f.close()


def create_dag(dag_name):
with DAG(dag_name, default_args=default_args,
schedule_interval='*/2 * * * *',
catchup=False
) as i_dag:
i_opr_greet = PythonOperator(task_id='greet', python_callable=greet,
op_args=["{}_{}".format("greet", dag_name)])
i_echo_op = BashOperator(task_id='echo', bash_command='echo `date`')

i_opr_greet >> i_echo_op
return i_dag


def create_all_dags():
all_lines = []
f = open("{}/../dag_names.txt".format(root_dir), "r")
for x in f:
all_lines.append(str(x))
f.close()

for line in all_lines:
print("Dag creation for {}".format(line))
globals()[line] = create_dag(line)


with DAG('master_dag', default_args=default_args,
schedule_interval='*/1 * * * *',
catchup=False
) as dag:
echo_op = BashOperator(task_id='echo', bash_command='echo `date`')
create_op = PythonOperator(task_id='create_dag', python_callable=create_all_dags)
echo_op >> create_op

最佳答案

您有 2 个选择:

  1. 使用 SubDagOperator:Example DAG 。如果您的计划间隔可以相同,请使用它。
  2. 编写 Python DAG 文件:在掌握 DAG 后,在包含 DAG 的 AIRFLOW_HOME 中创建 Python 文件。为此,您可以使用 Jinja2 模板引擎。

关于python - 如何在另一个 dag apache Airflow 中创建 dags,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60827864/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com