gpt4 book ai didi

python - 当子文件夹具有相同名称时,Airflow Packaged Dags(压缩)会发生冲突

转载 作者:太空宇宙 更新时间:2023-11-03 20:43:22 26 4
gpt4 key购买 nike

我们正在建立一个 Airflow 框架,多个数据科学家团队可以在其中协调他们的数据处理管道。我们开发了一个 Python 代码库来帮助他们实现 DAG,其中包括各种包和模块中的函数和类(还有 Operator 子类)。

每个团队都会将自己的 DAG 与包中的函数和类一起打包在 ZIP 文件中。例如,第一个 ZIP 文件将包含

ZIP1:

main_dag_teamA.py

subfolder1: package1-with-generic-functions + init.py

subfolder2: package2-with-generic-operators + init.py

另一个 ZIP 文件将包含

ZIP2:

main_dag_teamB.py

subfolder1: package1-with-generic-functions + init.py

subfolder2: package2-with-generic-operators + init.py

请注意,在两个 ZIP 文件中,子文件夹 1 和子文件夹 2 通常完全相同,这意味着具有相同功能和类的完全相同的文件。但随着时间的推移,当新版本的软件包可用时,DAG 软件包中的软件包内容将开始出现偏差。

通过此设置,我们遇到了以下问题:当包/子文件夹的内容在 ZIP 中开始出现偏差时,Airflow 似乎无法很好地处理同名包。因为当我运行“airflow list_dags”时,它会显示如下错误:

File "/data/share/airflow/dags/program1/program1.zip/program1.py", line 1, in > from subfolder1.functions1 import function1ImportError: No module named 'subfolder1.functions1'

可以使用以下代码重现问题,其中两个小 DAG 与包 my_functions 一起位于其 ZIP 文件中,该包具有相同的名称,但内容不同。

DAG 包 ZIP 1:

程序1.py

from my_functions.functions1 import function1

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def do_it():
print('program1')

dag = DAG(
'program1',
schedule_interval=None,
catchup=False,
start_date=datetime(2019, 6, 23)
)

hello_operator = PythonOperator(task_id='program1_task1', python_callable=do_it, dag=dag)

my_functions/functions1.py:

def function1():
print('function1')

DAG 包 ZIP 2:

程序2.py:

from my_functions.functions2 import function2

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def do_it():
print('program1')

dag = DAG(
'program1',
schedule_interval=None,
catchup=False,
start_date=datetime(2019, 6, 23)
)

hello_operator = PythonOperator(task_id='program2_task2', python_callable=do_it, dag=dag)

my_functions/functions2.py:

def function2():
print('function2')

使用这两个 ZIP 文件,当我运行“airflow list_dags”时,它显示错误:

文件“/data/share/airflow/dags/program1/program1.zip/program1.py”,第 1 行,位于from subfolder1.functions1 import function1 ImportError:没有名为“subfolder1.functions1”的模块

当 ZIP 中的子文件夹内容相同时,不会发生错误。

我的问题:如何防止 ZIP 中的子文件夹发生冲突?我真的很希望拥有完全代码独立的 DAG,以及它们自己的软件包版本。

最佳答案

通过在 DAG 顶部(program1.py 和 program2.py)、在

之前执行以下操作来解决
from my_functions.functions1 import function1

from my_functions.functions2 import function2

代码:

import sys

# Cleanup up the already imported function module
cleanup_mods = []
for mod in sys.modules:
if mod.startswith("function"):
cleanup_mods.append(mod)
for mod in cleanup_mods:
del sys.modules[mod]

这可以确保每次解析 DAG 时,导入的库都会被清理。

关于python - 当子文件夹具有相同名称时,Airflow Packaged Dags(压缩)会发生冲突,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56744582/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com