gpt4 book ai didi

python - 测试 : testing a websocket connection

转载 作者:太空宇宙 更新时间:2023-11-04 03:01:04 31 4
gpt4 key购买 nike

我有这段代码

class RTMClient:
...
#not important code
...
async def connect(self, queue: asyncio.Queue):
"""
Connect to the websocket stream and iterate over the messages
dumping them in the Queue.
"""
ws_url=''#url aquisition amended to readability
try:
self._ws = await websockets.connect(ws_url)
while not self.is_closed:
msg = await self._ws.recv()
if msg is None:
break
await queue.put(json.loads(msg))

except asyncio.CancelledError:
pass
finally:
self._closed.set()
self._ws = None

我想为它写一个自动化测试。我打算做什么:

  1. Monkeypatch websockets.connect 返回模拟连接
  2. 使模拟连接从预定义列表返回模拟消息
  3. 将模拟连接设置为 is_closedTrue
  4. 断言 websocket 连接已关闭
  5. 断言所有预定义的消息都在队列中

我的问题:如何模拟 websockets.connection 以实现步骤 1-3?我在想像这样的 pytest fixture

from websockets import WebSocketClientProtocol()
@pytest.fixture
def patch_websockets_connect(monkeypatch):
async def mock_ws_connect(*args, **kwargs):
mock_connection = WebSocketClientProtocol()
mock_connection.is_closed = False
return mock_connection

monkeypatch.setattr('target_module.websockets.connect', mock_ws_connect)

但我不明白我如何能够以这种方式返回预定义的消息列表,而且必须有更好的方法来做到这一点。

最佳答案

这不是一个完整的答案,但也许会对您有所帮助。

我在测试 rabbitmq 消息发送时遇到了类似的问题。看起来这里最常见和最可靠的方法是创建 Connection,创建 EmmiterConsumer,连接 Consumer到假的 Channel 并在测试中手动将消息发送到该 channel 。

因此,您创建了所有对象并只是模拟了它们的响应。这是非常相似的示例(使用套接字而不使用 websockets,但可能对您仍然有用):mocking a socket connection in Python

关于python - 测试 : testing a websocket connection,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40884504/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com