gpt4 book ai didi

python - 使用 Autobahn WebSocket 试用单元测试

转载 作者:行者123 更新时间:2023-11-28 17:48:12 27 4
gpt4 key购买 nike

我正在尝试为使用 Autobahn 的应用程序编写单元测试。

我想测试我的 Controller ,它从协议(protocol)中获取接收到的数据,解析它并对其使用react。

但是当我的测试到了应该断开协议(protocol)(self.sendClose)的点时,它会引发错误

exceptions.AttributeError: 'MyProtocol' object has no attribute 'state'.

我尝试使用 proto_helpers.StringTransport makeConnection 但是我也有错误

exceptions.AttributeError: StringTransport instance has no attribute 'setTcpNoDelay'`

我正在使用 trial 并且我不想仅出于测试目的运行虚拟服务器/客户端,因为不推荐这样做。

我应该如何编写测试,以便我可以使用假连接和试验来测试发送数据、读取数据、断开连接等功能?

最佳答案

如果不查看 MyProtocol 类,很难准确地说出发生了什么。这个问题听起来很像是因为你直接在低级函数上乱搞,因此也是 WebSocket 类的 state 属性,也就是,表示 WebSocket 连接的内部状态。

根据 the autobahn reference doc ,您可以直接使用和覆盖的 WebSicketProtocol API 是:

  • 打开
  • onMessage
  • 关闭
  • 发送消息
  • 发送关闭

您使用 StringTransport 测试协议(protocol)的方法并不理想。问题在于 MyProtocol 是 autobahn 提供的 WebSocketProtocol 框架之上的一个小层,无论好坏,它都隐藏了有关管理连接的细节,传输和内部协议(protocol)状态。

如果你考虑一下,你想测试你的东西,而不是 WebSocketProtocol,因此如果你不想嵌入虚拟服务器或客户端,最好的办法是直接测试那些方法MyProtocol 覆盖。

下面是我所说的一个例子

class MyPublisher(object):
cbk=None

def publish(self, msg):
if self.cbk:
self.cbk(msg)

class MyProtocol(WebSocketServerProtocol):

def __init__(self, publisher):
WebSocketServerProtocol.__init__(self)
#Defining callback for publisher
publisher.cbk = self.sendMessage

def onMessage(self, msg, binary)
#Stupid echo
self.sendMessage(msg)

class NotificationTest(unittest.TestCase):

class MyProtocolFactory(WebSocketServerFactory):
def __init__(self, publisher):
WebSocketServerFactory.__init__(self, "ws://127.0.0.1:8081")
self.publisher = publisher
self.openHandshakeTimeout = None

def buildProtocol(self, addr):
protocol = MyProtocol(self.listener)
protocol.factory = self
protocol.websocket_version = 13 #Hybi version 13 is supported by pretty much everyone (apart from IE <8 and android browsers)
return protocol

def setUp(self):
publisher = task.LoopingCall(self.send_stuff, "Hi there")
factory = NotificationTest.MyProtocolFactory(listener)
protocol = factory.buildProtocol(None)
transport = proto_helpers.StringTransport()
def play_dumb(*args): pass
setattr(transport, "setTcpNoDelay", play_dumb)
protocol.makeConnection(transport)
self.protocol, self.transport, self.publisher, self.fingerprint_handler = protocol, transport, publisher, fingerprint_handler

def test_onMessage(self):
#Following 2 lines are the problematic part. Here you are manipulating explicitly a hidden state which your implementation should not be concerned with!
self.protocol.state = WebSocketProtocol.STATE_OPEN
self.protocol.websocket_version = 13
self.protocol.onMessage("Whatever")
self.assertEqual(self.transport.value()[2:], 'Whatever')

def test_push(self):
#Following 2 lines are the problematic part. Here you are manipulating explicitly a hidden state which your implementation should not be concerned with!
self.protocol.state = WebSocketProtocol.STATE_OPEN
self.protocol.websocket_version = 13
self.publisher.publish("Hi there")
self.assertEqual(self.transport.value()[2:], 'Hi There')

您可能已经注意到,在这里使用 StringTransport 非常麻烦。您必须了解 underline 框架并绕过它的状态管理,这是您不想做的事情。不幸的是,autobahn 不提供允许简单状态操作的现成可用测试对象,因此我使用虚拟服务器和客户端的建议仍然有效


使用网络测试您的服务器

提供的测试展示了如何测试服务器推送,断言您得到的是您所期望的,并且还使用了一个钩子(Hook)来确定何时完成。

服务器协议(protocol)

from twisted.trial.unittest import TestCase as TrialTest
from autobahn.websocket import WebSocketServerProtocol, WebSocketServerFactory, WebSocketClientProtocol, WebSocketClientFactory, connectWS, listenWS
from twisted.internet.defer import Deferred
from twisted.internet import task

START="START"

class TestServerProtocol(WebSocketServerProtocol):

def __init__(self):
#The publisher task simulates an event that triggers a message push
self.publisher = task.LoopingCall(self.send_stuff, "Hi there")

def send_stuff(self, msg):
#this method sends a message to the client
self.sendMessage(msg)

def _on_start(self):
#here we trigger the task to execute every second
self.publisher.start(1.0)

def onMessage(self, message, binary):
#According to this stupid protocol, the server starts sending stuff when the client sends a "START" message
#You can plug other commands in here
{
START : self._on_start
#Put other keys here
}[message]()

def onClose(self, wasClean, code, reason):
#After closing the connection, we tell the task to stop sending messages
self.publisher.stop()

客户端协议(protocol)和工厂

下一类是客户端协议(protocol)。它基本上告诉服务器开始推送消息。它调用它们的 close_condition 来查看是否到了关闭连接的时间,并且作为最后一件事,它调用了它收到的消息的 assertion 函数来查看是否测试成功与否

class TestClientProtocol(WebSocketClientProtocol):
def __init__(self, assertion, close_condition, timeout, *args, **kwargs):
self.assertion = assertion
self.close_condition = close_condition
self._received_msgs = []
from twisted.internet import reactor
#This is a way to set a timeout for your test
#in case you never meet the conditions dictated by close_condition
self.damocle_sword = reactor.callLater(timeout, self.sendClose)

def onOpen(self):
#After the connection has been established,
#you can tell the server to send its stuff
self.sendMessage(START)

def onMessage(self, msg, binary):
#Here you get the messages pushed from the server
self._received_msgs.append(msg)
#If it is time to close the connection
if self.close_condition(msg):
self.damocle_sword.cancel()
self.sendClose()

def onClose(self, wasClean, code, reason):
#Now it is the right time to check our test assertions
self.assertion.callback(self._received_msgs)

class TestClientProtocolFactory(WebSocketClientFactory):
def __init__(self, assertion, close_condition, timeout, **kwargs):
WebSocketClientFactory.__init__(self, **kwargs)
self.assertion = assertion
self.close_condition = close_condition
self.timeout = timeout
#This parameter needs to be forced to None to not leave the reactor dirty
self.openHandshakeTimeout = None

def buildProtocol(self, addr):
protocol = TestClientProtocol(self.assertion, self.close_condition, self.timeout)
protocol.factory = self
return protocol

基于试验的测试

class WebSocketTest(TrialTest):

def setUp(self):
port = 8088
factory = WebSocketServerFactory("ws://localhost:{}".format(port))
factory.protocol = TestServerProtocol
self.listening_port = listenWS(factory)
self.factory, self.port = factory, port

def tearDown(self):
#cleaning up stuff otherwise the reactor complains
self.listening_port.stopListening()

def test_message_reception(self):
#This is the test assertion, we are testing that the messages received were 3
def assertion(msgs):
self.assertEquals(len(msgs), 3)

#This class says when the connection with the server should be finalized.
#In this case the condition to close the connectionis for the client to get 3 messages
class CommunicationHandler(object):
msg_count = 0

def close_condition(self, msg):
self.msg_count += 1
return self.msg_count == 3

d = Deferred()
d.addCallback(assertion)
#Here we create the client...
client_factory = TestClientProtocolFactory(d, CommunicationHandler().close_condition, 5, url="ws://localhost:{}".format(self.port))
#...and we connect it to the server
connectWS(client_factory)
#returning the assertion as a deferred purely for demonstration
return d

这显然只是一个示例,但如您所见,我不必乱搞 makeConnection 或任何显式的 transport

关于python - 使用 Autobahn WebSocket 试用单元测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14757054/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com