gpt4 book ai didi

python - plugins_manager.py 错误 - 无法导入插件

转载 作者:行者123 更新时间:2023-12-02 03:27:27 25 4
gpt4 key购买 nike

我正在尝试对创建的自定义 apache Airflow Hook 运行一些测试。测试通过了,但出现了这个奇怪的错误(不影响测试),如下所示。这很奇怪,因为 BrazeHook 可以导入并使用,并且不会在任何其他类或测试中导致错误

[2019-06-21 16:11:27,748] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=53175
[2019-06-21 16:11:29,243] {{plugins_manager.py:143}} ERROR - cannot import name 'BrazeHook'
Traceback (most recent call last):
File "/xxxxx/xxx/venv/lib/python3.6/site-packages/airflow/plugins_manager.py", line 137, in <module>
m = imp.load_source(namespace, filepath)
File "/xxxxx/bloodflow/venv/lib/python3.6/imp.py", line 172, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 684, in _load
File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/xxxx/xxx/plugins/operators/s3_to_braze_operator.py", line 9, in <module>
from plugins.hooks.braze_hook import BrazeHook
ImportError: cannot import name 'BrazeHook'
[2019-06-21 16:11:29,246] {{plugins_manager.py:144}} ERROR - Failed to import plugin /xxxx/xxx/plugins/operators/x_to_braze_operator.py

Ran 1 test in 0.071s

OK

类别

import requests

from braze.client import BrazeClient, BrazeClientError, BrazeInternalServerError


class BrazeImportError(Exception):
def __init__(self, message):
"""
Error occured while trying to import the users data into Braze

:param str message: error message returned by Braze or message out ot anu other exception.
"""
self.message = message
super(BrazeImportError, self).__init__()


class BrazeHook(BaseHook):

def __init__(self, braze_conn_id='braze', *args, **kwargs):
self.connection = self.get_connection(braze_conn_id)

def track(self, attributes=None, events=None, purchases=None):
"""
adds/modifies user data through /users/track endpoint of braze
:param attributes: dict or list of user attributes dict (external_id, ... }
:return:
:throws BrazeClientError encapsulation various errors returned by Braze
"""
try:
client = BrazeClient(api_url=self.connection.host, api_key=self.connection.extra_dejson.get('api_key'))
return client.user_track(attributes=attributes, events=events, purchases=purchases)
except requests.exceptions.ConnectionError as ce:
raise BrazeImportError(ce.args[0])
except (BrazeClientError, BrazeInternalServerError) as be:
raise BrazeImportError(message=str(be))

测试

class TestBrazeHook(unittest.TestCase):

@responses.activate
def test_request_path(self):
responses.add(responses.POST, "https://brazeurl/users/track",
json={'errors': u'', u'message': u'success', 'status_code': 200, 'success': True}, status=201)

handler = BrazeHook(braze_conn_id="braze")

response = handler.track(attributes=[])

self.assertEqual(201, response['status_code'])

更新按照评论中的要求添加错误消息引用的运算符

class S3ToBrazeOperator(BaseOperator):
"""
Copies Reader Score based audiences from S3 to Braze
"""

template_fields = ()
template_ext = ()
ui_color = '#ededed'

@apply_defaults
def __init__(self,
s3_bucket,
s3_path,
s3_conn_id,
braze_conn_id,
columns,
*args, **kwargs):
super(S3ToBrazeOperator, self).__init__(*args, **kwargs)
self.s3_bucket = s3_bucket
self.s3_path = s3_path
self.s3_conn_id = s3_conn_id
self.braze_conn_id = braze_conn_id
self.columns = columns

def execute(self, context):
request_handler = BrazeHook(braze_conn_id=self.braze_conn_id)

....

scores = dd.read_csv(f"s3://{self.s3_bucket}/{self.s3_path}/*",
sep=";", compression="gzip", header=None, storage_options=options).compute()

scores.columns = self.columns

attributes = []
# batches the results according to step size
for batch in chunk(scores, BRAZE_API_STEP):

batch.apply(lambda user: {
"external_id": user.user_id,
....
}, axis=1).apply(lambda attribute: attributes.append(attribute))

try:
request_handler.track(attributes=attributes)
except Exception as e:
logging.error("Error {}".format(str(e)))
raise

最佳答案

您可以尝试的一件事是将 __init__.py 文件放入插件目录中,其中包含以下内容:

from airflow.plugins_manager import AirflowPlugin

from plugins.hooks.braze_hook import BrazeHook
from plugins.operators.x_to_braze_operator import S3ToBrazeOperator


class BrazePlugin(AirflowPlugin):
name = "braze_plugin"
operators = [S3ToBrazeOperator]
sensors = []
hooks = [BrazeHook]
executors = []
macros = []
admin_views = []
flask_blueprints = []
menu_links = []
appbuilder_views = []
appbuilder_menu_items = []

这可能会有所帮助,但我并不是 100%。

关于python - plugins_manager.py 错误 - 无法导入插件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56705429/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com