gpt4 book ai didi

python - 为异步 Tornado Web 套接字服务器编写同步测试套件

转载 作者:太空狗 更新时间:2023-10-30 01:08:10 25 4
gpt4 key购买 nike

我正在尝试为我的 tornado 网络套接字服务器设计一个测试套件。

我正在使用客户端来执行此操作 - 通过 websocket 连接到服务器,发送请求并期待特定响应。

我正在使用 python 的单元测试来运行我的测试,所以我不能(也不想真的)强制执行测试的运行顺序。

这就是我的基础测试类(之后所有测试用例继承)的组织方式。 (日志记录和某些部分,此处无关的部分被剥离)。

class BaseTest(tornado.testing.AsyncTestCase):
ws_delay = .05

@classmethod
def setUpClass(cls):
cls.setup_connection()
return

@classmethod
def setup_connection(cls):
# start websocket threads
t1 = threading.Thread(target=cls.start_web_socket_handler)
t1.start()

# websocket opening delay
time.sleep(cls.ws_delay)

# this method initiates the tornado.ioloop, sets up the connection
cls.websocket.connect('localhost', 3333)

return

@classmethod
def start_web_socket_handler(cls):
# starts tornado.websocket.WebSocketHandler
cls.websocket = WebSocketHandler()
cls.websocket.start()

我想出的方案是让这个基类为所有测试初始化​​一次连接(虽然这不一定是这种情况 - 我很乐意为每个测试用例建立和拆除连接,如果它解决了我的问题)。重要的是我不想同时打开多个连接。

简单的测试用例看起来像这样。

class ATest(BaseTest):
@classmethod
def setUpClass(cls):
super(ATest, cls).setUpClass()

@classmethod
def tearDownClass(cls):
super(ATest, cls).tearDownClass()

def test_a(self):
saved_stdout = sys.stdout
try:
out = StringIO()
sys.stdout = out
message_sent = self.websocket.write_message(
str({'opcode': 'a_message'}})
)

output = out.getvalue().strip()
# the code below is useless
while (output is None or not len(output)):
self.log.debug("%s waiting for response." % str(inspect.stack()[0][3]))
output = out.getvalue().strip()

self.assertIn(
'a_response', output,
"Server didn't send source not a_response. Instead sent: %s" % output
)
finally:
sys.stdout = saved_stdout

它大部分时间都运行良好,但不是完全确定的(因此不可靠)。由于 websocket 通信是异步执行的,而 unittest 同步执行测试,服务器响应(在同一个 websocket 上接收到的)与请求混淆,测试偶尔会失败。

我知道它应该是基于回调的,但这并不能解决响应混合问题。除非,所有测试都在一系列回调中人为地排序(如在 test_1_callback 中启动 test_2)。

Tornado 提供了一个 testing library帮助同步测试,但我似乎无法让它与 websockets 一起工作(tornado.ioloop 有它自己的线程,你不能阻止)。

我找不到可以与 tornado 服务器一起使用并且符合 RFC 6455 标准的 python websocket 同步客户端库。皮皮的websocket-client无法满足第二个需求。

我的问题是:

  1. 有没有可靠的python同步websocket客户端库满足上述需求?

  2. 如果不是,组织这样的测试套件的最佳方式是什么(这些测试实际上不能并行运行)?

  3. 据我了解,由于我们使用的是一个 websocket,测试用例的 IOStreams 无法分开,因此无法确定响应来自哪个请求(我有多个测试对于具有不同参数的相同类型的请求)。我错了吗?

最佳答案

你看过websocket unit tests了吗?包括 Tornado ?他们向您展示了如何做到这一点:

from tornado.testing import AsyncHTTPTestCase, gen_test
from tornado.websocket import WebSocketHandler, websocket_connect

class MyHandler(WebSocketHandler):
""" This is the server code you're testing."""

def on_message(self, message):
# Put whatever response you want in here.
self.write_message("a_response\n")


class WebSocketTest(AsyncHTTPTestCase):
def get_app(self):
return Application([
('/', MyHandler, dict(close_future=self.close_future)),
])

@gen_test
def test_a(self):
ws = yield websocket_connect(
'ws://localhost:%d/' % self.get_http_port(),
io_loop=self.io_loop)
ws.write_message(str({'opcode': 'a_message'}}))
response = yield ws.read_message()
self.assertIn(
'a_response', response,
"Server didn't send source not a_response. Instead sent: %s" % response
)v

gen_test decorator 允许您将异步测试用例作为协程运行,当在 tornado 的 ioloop 中运行时,它们有效地使它们为了测试目的而同步运行。

关于python - 为异步 Tornado Web 套接字服务器编写同步测试套件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24614053/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com