gpt4 book ai didi

airflow - 无法导入 Airflow 插件

转载 作者:行者123 更新时间:2023-12-02 09:24:56 25 4
gpt4 key购买 nike

以下 Airflow 教程 here .

问题:网络服务器返回以下错误

Broken DAG: [/usr/local/airflow/dags/test_operator.py] cannot import name 
MyFirstOperator

注释:目录结构如下所示:

airflow_home
├── airflow.cfg
├── airflow.db
├── dags
│ └── test_operators.py
├── plugins
│ └── my_operators.py
└── unittests.cfg

我正在尝试像这样导入“test_operators.py”中的插件:

from airflow.operators import MyFirstOperator

代码与教程中的代码完全相同。

最佳答案

在努力阅读 Airflow 文档并尝试此处的一些答案但没有成功后,我发现 this approach from astronomer.io .

正如他们所指出的,构建 Airflow 插件可能会令人困惑,而且可能不是 future 添加 Hook 和操作符的最佳方式。

Custom hooks and operators are a powerful way to extend Airflow to meet your needs. There is however some confusion on the best way to implement them. According to the Airflow documentation, they can be added using Airflow’s Plugins mechanism. This however, overcomplicates the issue and leads to confusion for many people. Airflow is even considering deprecating using the Plugins mechanism for hooks and operators going forward.

因此,我没有乱搞插件 API,而是遵循天文学家的方法,设置 Airflow ,如下所示。

dags
└── my_dag.py (contains dag and tasks)
plugins
├── __init__.py
├── hooks
│ ├── __init__.py
│ └── mytest_hook.py (contains class MyTestHook)
└── operators
├── __init__.py
└── mytest_operator.py (contains class MyTestOperator)

通过这种方法,我的运算符和 Hook 的所有代码都完全位于各自的文件中 - 并且没有令人困惑的插件文件。所有 __init__.py 文件都是空的(与将插件代码放入其中一些同样令人困惑的方法不同)。

对于所需的导入,请考虑Airflow实际如何使用插件目录:

When Airflow is running, it will add dags/, plugins/, and config/ to PATH

这意味着执行 from airflow.operators.mytest_operator import MyTestOperator 可能行不通。相反,from Operator.mytest_operator import MyTestOperator 是可行的方法(请注意与我上面的设置中from directory/file.py import Class 的对齐)。

我的文件中的工作片段如下所示。

my_dag.py:

from airflow import DAG
from operators.mytest_operator import MyTestOperator
default_args = {....}
dag = DAG(....)
....
mytask = MyTestOperator(task_id='MyTest Task', dag=dag)
....

my_operator.py:

from airflow.models import BaseOperator
from hooks.mytest_hook import MyTestHook

class MyTestOperator(BaseOperator):
....
hook = MyTestHook(....)
....

my_hook.py:

class MyTestHook():
....

这对我有用,并且比尝试子类化 AirflowPlugin 简单得多。但是,如果您想要更改网络服务器 UI,它可能对您不起作用:

Note: The Plugins mechanism still must be used for plugins that make changes to the webserver UI.

顺便说一句,我之前遇到的错误(现已解决):

ModuleNotFoundError: No module named 'mytest_plugin.hooks.mytest_hook'
ModuleNotFoundError: No module named 'operators.mytest_plugin'

关于airflow - 无法导入 Airflow 插件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43907813/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com