gpt4 book ai didi

python - 如何使用 aiobotocore 模拟 AWS S3

转载 作者:行者123 更新时间:2023-11-28 21:39:41 27 4
gpt4 key购买 nike

我有一个项目使用 aiohttp 和 aiobotocore 来处理 AWS 中的资源。我正在尝试测试适用于 AWS S3 的类,并且我正在使用 moto 来模拟 AWS。模拟适用于使用同步代码的示例(来自 moto 文档的示例)

import boto3
from moto import mock_s3


class MyModel(object):
def __init__(self, name, value):
self.name = name
self.value = value

def save(self):
s3 = boto3.client('s3', region_name='us-east-1')
s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)


def test_my_model_save():
with mock_s3():
conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='mybucket')

model_instance = MyModel('steve', 'is awesome')
model_instance.save()
body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")

assert body == 'is awesome'

但是,在重写它以使用 aiobotocore 模拟之后不起作用 - 在我的示例中它连接到真实的 AWS S3。

import aiobotocore
import asyncio

import boto3
from moto import mock_s3


class MyModel(object):
def __init__(self, name, value):
self.name = name
self.value = value

async def save(self, loop):
session = aiobotocore.get_session(loop=loop)
s3 = session.create_client('s3', region_name='us-east-1')
await s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)


def test_my_model_save():
with mock_s3():
conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='mybucket')
loop = asyncio.get_event_loop()

model_instance = MyModel('steve', 'is awesome')
loop.run_until_complete(model_instance.save(loop=loop))
body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")

assert body == 'is awesome'

所以我在这里的假设是 moto 不能与 aiobotocore 一起正常工作。如果我的源代码类似于第二个示例,我该如何有效地模拟 AWS 资源?

最佳答案

来自 moto 的模拟不起作用,因为它们使用同步 API。但是,您可以启动 moto 服务器并配置 aiobotocore 以连接到此测试服务器。 Take a look on aiobotocore tests寻找灵感。

关于python - 如何使用 aiobotocore 模拟 AWS S3,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46420709/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com