gpt4 book ai didi

Python AWS SQS 与 MOTO 模拟

转载 作者:行者123 更新时间:2023-12-05 02:39:12 28 4
gpt4 key购买 nike

我正在尝试用 moto 模拟 AWS SQS,下面是我的代码

from myClass import get_msg_from_sqs
from moto import mock_sqs
#from moto.sqs import mock_sqs


@mock_sqs
def test_get_all_msg_from_queue():

#from myClass import get_msg_from_sqs

conn = boto3.client('sqs', region_name='us-east-1')

queue = conn.create_queue(QueueName='Test')

os.environ["SQS_URL"] = queue["QueueUrl"]

queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))


#Tried this as well
#conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))

resp = get_msg_from_sqs(queue["QueueUrl"])

assert resp is not None

执行此操作时出现以下错误

>       queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
E AttributeError: 'dict' object has no attribute 'send_message'

如果我尝试另一种方式在 SQS 中发送消息(参见注释掉的代码#Tried this as well)然后在我的方法 get_msg_from_sqs 中调用实际 SQS 时,出现以下错误

E  botocore.exceptions.ClientError: An error occurred
(InvalidAddress) when calling the ReceiveMessage operation:
The address https://queue.amazonaws.com/ is not valid for this endpoint.

我用 PyCharm 在 win10 上运行它,moto 版本设置为

moto = "^2.2.6"

我的代码如下

sqs = boto3.client('sqs')
def get_msg_from_queue(queue_url: str) -> dict:
return sqs.receive_message(QueueUrl=queue_url, AttributeNames=['All'],
MaxNumberOfMessages=1, VisibilityTimeout=3600, WaitTimeSeconds=0)

我在这里错过了什么?

最佳答案

您的 queue 变量是 create_queue 返回的字典:

queue = conn.create_queue(QueueName='Test')

它不是队列,因此您不能对其调用 sendMessage

为此,您需要创建一个队列对象:

sqs = boto3.resource('sqs')
response = conn.create_queue(QueueName='Test')
queue_url = response["QueueURL"]
queue = sqs.Queue(queue_url)

queue.send_message()

关于Python AWS SQS 与 MOTO 模拟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69213098/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com