gpt4 book ai didi

django - 使用 WebSocketCommunicator 在 Django Channels v2 测试中进行身份验证

转载 作者:行者123 更新时间:2023-12-04 12:24:26 25 4
gpt4 key购买 nike

在为我的聊天消费者编写测试的过程中,我遇到了无法在测试中使用 WebSocketCommunicator 进行身份验证的问题。我有自定义的 JwtTokenAuthMiddleware,它通过在请求查询中使用 token 在套接字中实现身份验证,因为据我所知,使用授权 header 进行体面的身份验证是不可能的。你们能就此给我建议或向我提供示例代码,我在网上找不到吗?顺便说一句,我的聊天没有问题。测试也应该完全没问题,我从官方文档 Django Channels 2.x 测试中获取了指南。

--JwtTokenAuthMiddlewate--

class JwtTokenAuthMiddleware:
def __init__(self, inner):
self.inner = inner

def __call__(self, scope):
close_old_connections()
try:
raw_token = scope['query_string'].decode().split('=')[1]
auth = JWTAuthentication()
validated_token = auth.get_validated_token(raw_token)
user = auth.get_user(validated_token)
scope['user'] = user
except (IndexError, InvalidToken, AuthenticationFailed):
scope['user'] = AnonymousUser()
return self.inner(scope)


JwtTokenAuthMiddlewareStack = lambda inner: JwtTokenAuthMiddleware(AuthMiddlewareStack(inner))

--示例测试--
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_trainer_auth_success():
room = await database_sync_to_async(RoomFactory.create)()
trainer = room.trainer
trainer_token = await sync_to_async(get_token_for_user)(trainer.user)
room_url = f'ws/room/{room.id}/'

trainer_communicator = WebsocketCommunicator(application, f'{room_url}?t={trainer_token}')
connected, _ = await trainer_communicator.connect()
assert connected

trainer_connect_resp = await trainer_communicator.receive_json_from()
assert_connection(trainer_connect_resp, [], room.max_round_time)
await trainer_communicator.disconnect()

--错误追溯--
___________________________________________________________ test_trainer_auth_success ___________________________________________________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f6b9906f290>, timeout = 1

async def receive_output(self, timeout=1):
"""
Receives a single message from the application, with optional timeout.
"""
# Make sure there's not an exception to raise from the task
if self.future.done():
self.future.result()
# Wait and receive the message
try:
async with async_timeout(timeout):
> return await self.output_queue.get()

/usr/local/lib/python3.7/site-packages/asgiref/testing.py:74:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Queue at 0x7f6b98f76510 maxsize=0>

async def get(self):
"""Remove and return an item from the queue.

If queue is empty, wait until an item is available.
"""
while self.empty():
getter = self._loop.create_future()
self._getters.append(getter)
try:
> await getter
E concurrent.futures._base.CancelledError

/usr/local/lib/python3.7/asyncio/queues.py:159: CancelledError

During handling of the above exception, another exception occurred:

@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_trainer_auth_success():
room = await database_sync_to_async(RoomFactory.create)()
trainer = room.trainer
trainer_token = await sync_to_async(get_token_for_user)(trainer.user)
room_url = f'ws/room/{room.id}/'

# trainer_communicator = await assert_get_connected_communicator(application, room_url, trainer_token)
trainer_communicator = WebsocketCommunicator(application, f'{room_url}?t={trainer_token}')
> connected, _ = await trainer_communicator.connect()

apps/chat/tests/test_consumers.py:39:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.7/site-packages/channels/testing/websocket.py:36: in connect
response = await self.receive_output(timeout)
/usr/local/lib/python3.7/site-packages/asgiref/testing.py:85: in receive_output
raise e
/usr/local/lib/python3.7/site-packages/asgiref/testing.py:74: in receive_output
return await self.output_queue.get()
/usr/local/lib/python3.7/site-packages/asgiref/timeout.py:68: in __aexit__
self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <asgiref.timeout.timeout object at 0x7f6b98f76d50>, exc_type = <class 'concurrent.futures._base.CancelledError'>

def _do_exit(self, exc_type: Type[BaseException]) -> None:
if exc_type is asyncio.CancelledError and self._cancelled:
self._cancel_handler = None
self._task = None
> raise asyncio.TimeoutError
E concurrent.futures._base.TimeoutError

/usr/local/lib/python3.7/site-packages/asgiref/timeout.py:105: TimeoutError
=============================================================== warnings summary ================================================================
/usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:39
/usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:39: PytestDeprecationWarning: direct construction of Function has been deprecated, please use Function.from_parent
item = pytest.Function(name, parent=collector)

/usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:45
/usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:45: PytestDeprecationWarning: direct construction of Function has been deprecated, please use Function.from_parent
item = pytest.Function(name, parent=collector) # To reload keywords.

最佳答案

我一直在尝试做或多或少相同的事情,似乎在测试消费者和传播者时没有简单的方法来验证用户。 There is关于此主题的 GH 问题,其中提供了一些(有效!)解决方法,也许您会发现它有帮助。

关于django - 使用 WebSocketCommunicator 在 Django Channels v2 测试中进行身份验证,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61387933/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com