gpt4 book ai didi

python - 如何模拟 boto3 的 StreamingBody 对象以在 Python 中使用 BytesIO 进行处理?

转载 作者:行者123 更新时间:2023-12-04 04:23:35 24 4
gpt4 key购买 nike

我正在对一个函数进行单元测试,该函数将元素从 S3 对象转换为 Pandas DataFrame,并且需要模拟从 boto3 返回的 StreamingBody 对象

文件.py

def object_to_df(self, key_name, dtypes):
s3_object = self.get_object(key_name=key_name)
if s3_object is not None:
object_df = pandas.read_csv(
io.BytesIO(s3_object["Body"].read()), dtype=dtypes
)
return object_df

记录了 self.get_object(key_name) 的响应 here
{
'Body': StreamingBody(),
'DeleteMarker': True|False,
'AcceptRanges': 'string',
...
}

所以我需要模拟那个 StreamingBody() 对象并让我的模拟函数返回它。

测试.py

import unittest
import pandas
from io import StringIO
from unittest.mock import patch, Mock
from path.to.file import custom_class
from botocore.response import StreamingBody

class TestS3Class(unittest.TestCase):
"""TestCase for path_to/file.py"""

def setUp(self):
"""Creates an instance of the live class for testing"""
self.s3_test_client = S3()


@patch('path.to.class.get_object')
def test_object_to_df(self, mock_get_object):
""""""
mock_response = {'Body': [{'Candidate': 'Black Panther', 'Votes': 3},
{'Candidate': 'Captain America: Civil War', 'Votes': 8},
{'Candidate': 'Guardians of the Galaxy', 'Votes': 8},
{'Candidate': "Thor: Ragnarok", 'Votes': 1}
]}
mock_stream = StreamingBody(StringIO(str(mock_response)), len(str(mock_response)))
mock_get_object.return_value = mock_stream
self.assertIsInstance(self.s3_test_client.object_to_df(key_name='key_name', dtypes=str), pandas.DataFrame)

但我遇到了 TypeError: 'StreamingBody' object is not subscriptable
任何提示?

最佳答案

S3 客户端返回一个 dict,而您模拟的 S3 客户端返回一个 StreamingBody。你模拟的 S3 客户端应该返回类似的东西

body_json = {
'Body': [
{'Candidate': 'Black Panther', 'Votes': 3},
{'Candidate': 'Captain America: Civil War', 'Votes': 8},
{'Candidate': 'Guardians of the Galaxy', 'Votes': 8},
{'Candidate': "Thor: Ragnarok", 'Votes': 1}
]
}

body_encoded = json.dump(body_json).encode("utf-8")

body = StreamingBody(
StringIO(body_encoded),
len(body_encoded)
)

mocked_response = {
'Body': body,
...
}

mock_get_object.return_value = mocked_response

关于python - 如何模拟 boto3 的 StreamingBody 对象以在 Python 中使用 BytesIO 进行处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58476137/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com