gpt4 book ai didi

python - Apache Airflow——在多个地方使用的自定义类

转载 作者:行者123 更新时间:2023-12-05 07:23:05 26 4
gpt4 key购买 nike

我正在使用 Airflow 1.10.2。我正在尝试定义一个自定义模块,该模块将包含可在多个 dag 和运算符中使用的通用功能。

一个具体的例子可以是一个enum。我想在自定义运算符中使用它(以修改其行为)。但我也想在 dag 定义中使用它,它可以用作参数。

这是我当前的层次结构

airflow_home
| - dags/
- __init__.py
- my_dag.py
| - plugins/
- operators/
- __init__.py
- my_operator.py
- common/
- __init__.py
- my_enum.py

假设我想定义一个枚举(在 my_enum.py 模块中):

class MyEnum(Enum):
OPTION_1 = 1
OPTION_2 = 2

它被导入到运算符(在 my_operator.py 中)为:

from common.my_enum import MyEnum

并以相同的方式进入 dag(在 my_dag.py 中):

from common.my_enum import MyEnum

奇怪的是(?),这对我有用。但是,我非常不确定这是否是做这种事情的正确方法。一位同事告诉我,他过去曾尝试这样做(可能是在旧版本的 Airflow 上)但它不起作用(当 Airflow 开始时“破损”)。因此,恐怕它在未来或在特定条件下可能不会(可能会停止)工作,因为它既不是运算符(operator),也不是传感器等。

我没有找到任何关于如何分离共享行为的指南。我发现 Airflow 导入系统非常复杂而且不是很直接。我理想的解决方案是将模块 common 移动到与 dagsoperators 相同的级别。

另外,我不太确定如何从文档中解释这句话:插件文件夹中的 python 模块被导入,钩子(Hook)、运算符、传感器、宏、执行器和 Web View 被集成到 Airflow 的主要集合并可供使用。 这是否意味着我的方法是正确的,因为 plugins/ 中的任何 python 模块都被导入了?

这是实现我的目标的好方法,还是有更好的解决方案?

谢谢你的建议

最佳答案

这样做有点古怪。

正确的方法是首先创建一个 hookoperator它将使用这个钩子(Hook)。对于如下所示的更简单的情况,您甚至不需要在运算符中调用 Hook 。

#1。 放置

<PROJECT NAME>/<PLUGINS_FOLDER>/<PLUGIN NAME>/__init__.py
<PROJECT NAME>/<PLUGINS_FOLDER>/<PLUGIN NAME>/<some_new>_hook.py
<PROJECT NAME>/<PLUGINS_FOLDER>/<PLUGIN NAME>/<some_new>_operator.py

对于看起来像这样的真实案例场景:

CRMProject/crm_plugin/__init__.py
CRMProject/crm_plugin/crm_hook.py
CRMProject/crm_plugin/customer_operator.py

#2。 代码

CRMProject/crm_plugin/__init__.py的示例代码:

# CRMProject/crm_plugin/__init__.py
from airflow.plugins_manager import AirflowPlugin
from crm_plugin.crm_hook import CrmHook
from crm_plugin.customer_operator import CreateCustomerOperator, DeleteCustomerOperator, UpdateCustomerOperator


class AirflowCrmPlugin(AirflowPlugin):
name = "crm_plugin" # does not need to match the package name
operators = [CreateCustomerOperator, DeleteCustomerOperator, UpdateCustomerOperator]
sensors = []
hooks = [CrmHook]
executors = []
macros = []
admin_views = []
flask_blueprints = []
menu_links = []
appbuilder_views = []
appbuilder_menu_items = []
global_operator_extra_links = []
operator_extra_links = []

钩子(Hook)类的示例代码 - CRMProject/crm_plugin/crm_hook.py .永远不要直接从 system\API 调用它。为此使用 Operator(见下文)。

from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException
from crm_sdk import crm_api # import external libraries to interact with target system


class CrmHook(BaseHook):
"""
Hook to interact with the ACME CRM System.
"""

def __init__(self, ...):
# your code goes here

def insert_object(self, ...):
"""
Insert an object into the CRM system
"""
# your code goes here

def update_object(self, ...):
"""
Update an object into the CRM system
"""
# your code goes here

def delete_object(self, ...):
"""
Delete an object into the CRM system
"""
# your code goes here

def extract_object(self, ...):
"""
Extract an object into the CRM system
"""
# your code goes here

您将在 DAG 中使用的运算符示例代码 (CRMProject/crm_plugin/customer_operator.py)。运算符要求您实现一个执行方法。这是 Airflow 操作符的入口点,它会在 DAG 中的任务执行时调用。apply_defaults装饰器包装 __init__将在 DAG 脚本中设置的 DAG 默认值应用到运行时运算符的任务实例的类的方法。

我们还可以设置两个重要的类属性。这些是 templated_fieldstemplate_ext .这两个属性是可迭代对象,应包含字段的字符串值和/或文件扩展名,这些字段和/或文件扩展名将允许使用 Airflow 中的 jinja 模板支持进行模板化。

from airflow.exceptions import AirflowException
from airflow.operators import BaseOperator
from airflow.utils.decorators import apply_defauls

from crm_plugin.crm_hook import CrmHook


class CreateCustomerOperator(BaseOperator):
"""
This operator creates a new customer in the ACME CRM System.
"""
template_fields = ['first_contact_date', 'bulk_file']
template_ext = ['.csv']

@apply_defaults
def __init__(self, first_contact_date, bulk_file, ...):
# your code goes here

def _customer_exist(self, ...):
"""
Helper method to check if a customer exist. Raises an exception if it does.
"""
# your code goes here

def execute(self, context):
"""
Create a new customer in the CRM system.
"""
# your code goes here

为了简化执行方法,您可以根据需要在类中创建任意数量的方法。优秀类(class)设计的相同原则在这里仍然很重要。

#3。 部署和使用您的插件

一旦您完成了插件的工作,剩下要做的就是复制您的 <PLUGIN NAME>包文件夹到 Airflow 插件文件夹。 Airflow 将选择插件并将其提供给您的 DAG。如果我们将简单的 CRM 插件复制到我们的 plugins_folder文件夹结构如下所示。

<plugins_folder>/crm_plugin/__init__.py
<plugins_folder>/crm_plugin/crm_hook.py
<plugins_folder>/crm_plugin/customer_operator.py

为了使用您的新插件,您只需使用以下语句导入您的 Operators 和 Hooks。

from airflow.hooks.crm_plugin import CrmHook
from airflow.operators.crm_plugin import CreateCustomerOperator, DeleteCustomerOperator, UpdateCustomerOperator

Source

关于python - Apache Airflow——在多个地方使用的自定义类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56221053/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com