- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
假设您只使用 Autobahn 连接(不是原始的 WebSocket)。
我们如何在没有网络的情况下测试我们的 RPC 方法和事件?
由于它是 Twisted,我认为最合适的工具是 Twisted Trial。
但我无法弄清楚我应该如何编写这些测试,而不编写大量样板代码并重新使用 Autobahn 的内部实现(甚至我不确定我是否能够这样做).
你会怎么做?
最佳答案
试图回答我自己的问题。
然后,要对 RPC 方法和事件进行单元测试,我们需要假设 Autobahn 已经过良好测试,我们不必对其进行测试,解决方案就变得简单了:
全部模拟。
在我的应用程序中,我有两种类型的组件(阅读 ApplicationSession):StandardComponent 和 DatabaseComponent(继承自 StandardComponent)。
单元测试中最大的问题是我们有很多依赖关系,比如数据库连接、Redis 连接等......
我在测试中所做的是通过子类化 unittest.TestCase
来修补所有这些对象:
class APITestCase(unittest.TestCase):
def _patchObject(self, module_name, **kwargs):
patcher = patch(module_name, **kwargs)
mock = patcher.start()
self.patches.append(patcher)
return mock
def setUp(self):
logging.disable(logging.CRITICAL)
self.patches = []
self.session = self._patchObject('components.ApplicationAPI')
self.database = self._patchObject('txpostgres.txpostgres.Connection')
def tearDown(self):
for patcher in self.patches:
patcher.stop()
我在我的测试用例中注入(inject)了一个模拟 session 和一个模拟数据库。
然后,测试变得非常非常简单。
每当我调用需要调用数据库或从数据库获取结果的 RPC 方法时,我都会对其进行修补:self.mocked_auth_user.return_value = (1, "abc", "something", "admin")
在我的测试方法中:
def test_authenticate_success(self):
self.mocked_auth_user.return_value = (1, "abc", "paris", "admin")
def _doTest(auth_result):
attempted_auth_result = {
"secret": "abc",
"role": "admin",
"authid": "1",
"salt": "paris",
"iterations": 1000,
"keylen": 32
}
self.assertEqual(auth_result, attempted_auth_result)
self.mocked_auth_user.assert_called_with(self.api.database, "raito")
return self.api.authenticate("test", "raito", {}).addCallback(_doTest)
你可以做一些更高级和有趣的测试,看看你的方法是否是防错的:
def test_authenticate_authid_not_found(self):
def _raiseException(db, user):
return defer.fail(Exception("User {} not found!".format(user)))
self.mocked_auth_user.side_effect = _raiseException
return self.failUnlessFailure(self.api.authenticate("test", "raito", {}), AuthenticationError)
事件也是如此,你只需要调用它们并测试它们是否发布事件(self.session.publish.assert_called_with(...)
)
它变得神奇!
无论如何,它解决了单元测试问题,但是集成还没有。我正在研究它,但这个问题可能会使用一些虚拟化技术 (Docker) 或类似的东西来解决。
关于python - 如何使用 Twisted Trial 对 Autobahn 应用程序进行单元测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29199906/
我最近发现 autobahn python 和 js 是建立 pub/sub 服务器和相应客户端的舒适方法,即使使用 rpc 调用也是如此。 看完教程后,我设置了一个测试版本,其中有一个 websoc
我正在使用 Ratchet 和 Autobahn.js。我想在订阅时进行一些用户验证,所以我需要将 session key 传递给 Ratchet WAMP 服务器。你能告诉我如何在订阅事件中将一些数
我有一个 Crossbar.js 实现,其中数据通过 Crossbar websocket 服务器在客户端(网站)和服务器(Node.js)之间发送。双方都使用 Autobahn.JS 连接到 Cro
我正在我们的客户和我们之间建立一个博客,以便他们可以有一个空间来讨论不同的商业交易(每笔商业交易都有一个引用号并有自己的信息)。 现在我可以使用 ZeroMQ 和 Autobahn 设置 Ratche
我正在使用 Autobahn Wamp 在 Python 中进行套接字连接。我正在使用 PubSub 机制来建立连接。连接建立成功。 但我找不到一种方法来维护与服务器连接的客户端列表。 任何人都可以建
我正在试验 Ratchet php library .特别是,我一直在尝试整合他们的 push integration演示到 React 应用程序中。 他们的演示引用 a seemingly one-
我正在使用下面的 JavaScript 代码连接到高速公路服务器。一直以来我都使用静态IP,但现在IP将是动态的。如何传递动态服务器 IP 并动态连接高速公路? var connection = ne
我正在研究 WebSockets atm,刚刚发现 Autobahn 带有 Autobahn|Python 。我不确定我是否正确理解该工具集(?)的功能。 我的目的是使用 WebSocket-Serv
我正在尝试为使用 Autobahn 的应用程序编写单元测试。 我想测试我的 Controller ,它从协议(protocol)中获取接收到的数据,解析它并对其使用react。 但是当我的测试到了应该
我目前正在尝试通过 Crossbar/Autobahn 使用 Websockets 实现用户通知系统。我已经完成了多项测试并阅读了文档,但是,我不确定是否有解决以下工作流程的方法: 用户使用网络应用程
我从 OpenCV VideoCapture.read() 捕获视频帧并将帧发送到 WebSocket 服务器(Twisted with Autobahn WebSocket API),我还使用 Tw
当使用 autobahn 进行 RPC 时,autobahn 支持返回 python 对象的过程,只要它们是 json 可序列化的。我想要返回的某些对象默认情况下不可 json 序列化。我可以轻松编写
是否可以运行(挂载在cherrypy树中)autobahnn的websocket类以在相同的端口但不同的URL上运行? 例如: http://localhost:8080/web服务器静态内容(htm
我喜欢 crossbar.io 及其工作方式(个人)。但我想知道如何使用 Autobahn(Crossbar.io) 为典型的动态聊天应用程序设置架构。 这里的动态聊天是指为每个 url 创建的单独聊
在 autobahnJS 的文档中提供以下示例来说明如何设置远程过程调用 (RPC): ... // 3) register a procedure for remoting functi
我在这里运行了这个例子:https://github.com/crossbario/crossbarexamples/tree/master/wss/python ,一切正常。 但是,以下情况对我不起
场景:1. 我有一个 ListView ,即时获取数据。由于滚动它可能获得的数据可能是陈旧的。 2. 当我使用线程实现 HTTP 客户端时,在滚动时我会取消所有请求 threadHandler.rem
我想重写我的 WebSocketClientFactory 类以允许传入数据填充作业队列。这是我正在尝试的连接代码 factory = WebSocketClientFactory("ws:/
我正在继承 WampCraServerProtocol(来自 Autobahn Python)并覆盖 getAuthSecret。我知道现在我可以从那个方法返回一个延迟,但是,当做一个简单的测试时:
我正在尝试将 aiohttp 网络服务器集成到 Crossbar+Autobahn 系统架构中。 更详细地说,当aiohttp 服务器收到某个API 调用时,它必须向Crossbar 路由器发布消息。
我是一名优秀的程序员,十分优秀!