gpt4 book ai didi

airflow - 如何在 Airflow 中动态创建子标签

转载 作者:行者123 更新时间:2023-12-01 12:15:19 24 4
gpt4 key购买 nike

我有一个主 dag,它检索文件并将该文件中的数据拆分为单独的 csv 文件。
我必须为这些 csv 文件的每个文件完成另一组任务。例如(上传到 GCS,插入到 BigQuery)
如何根据文件数量为每个文件动态生成 SubDag? SubDag 将定义上传到 GCS、插入到 BigQuery、删除 csv 文件等任务)

所以现在,这就是它的样子

main_dag = DAG(....)
download_operator = SFTPOperator(dag = main_dag, ...) # downloads file
transform_operator = PythonOperator(dag = main_dag, ...) # Splits data and writes csv files

def subdag_factory(): # Will return a subdag with tasks for uploading to GCS, inserting to BigQuery.
...
...

如何为在 transform_operator 中生成的每个文件调用 subdag_factory?

最佳答案

我尝试创建 subdag s 动态如下

# create and return and DAG
def create_subdag(dag_parent, dag_id_child_prefix, db_name):
# dag params
dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix + db_name)
default_args_copy = default_args.copy()

# dag
dag = DAG(dag_id=dag_id_child,
default_args=default_args_copy,
schedule_interval='@once')

# operators
tid_check = 'check2_db_' + db_name
py_op_check = PythonOperator(task_id=tid_check, dag=dag,
python_callable=check_sync_enabled,
op_args=[db_name])

tid_spark = 'spark2_submit_' + db_name
py_op_spark = PythonOperator(task_id=tid_spark, dag=dag,
python_callable=spark_submit,
op_args=[db_name])

py_op_check >> py_op_spark
return dag

# wrap DAG into SubDagOperator
def create_subdag_operator(dag_parent, db_name):
tid_subdag = 'subdag_' + db_name
subdag = create_subdag(dag_parent, tid_prefix_subdag, db_name)
sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
return sd_op

# create SubDagOperator for each db in db_names
def create_all_subdag_operators(dag_parent, db_names):
subdags = [create_subdag_operator(dag_parent, db_name) for db_name in db_names]
# chain subdag-operators together
airflow.utils.helpers.chain(*subdags)
return subdags


# (top-level) DAG & operators
dag = DAG(dag_id=dag_id_parent,
default_args=default_args,
schedule_interval=None)

subdag_ops = create_subdag_operators(dag, db_names)

请注意 subdag 的输入列表s 已创建,在这里 db_names , 可以在 python 中静态声明文件或可以从外部源读取。

由此产生的 DAG看起来像这样
enter image description here
enter image description here

潜入 SubDAG (s)

enter image description here

enter image description here

关于airflow - 如何在 Airflow 中动态创建子标签,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48947867/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com