gpt4 book ai didi

具有队列 : asyncio. 的 Python 3.8 websocket 回显客户端 Queue get() 不会即时添加队列项

转载 作者:行者123 更新时间:2023-12-05 07:09:50 26 4
gpt4 key购买 nike

我尝试创建一个客户端,它使用 asyncio.Queue 来提供我想发送到服务器的消息。从 websocket 服务器接收数据效果很好。发送由生产者刚刚生成的数据也有效。为了解释什么有效,什么失败,首先是我的代码:

import sys
import asyncio
import websockets


class WebSocketClient:
def __init__(self):
self.send_queue = asyncio.Queue()
#self.send_queue.put_nowait('test-message-1')

async def startup(self):
await self.connect_websocket()
consumer_task = asyncio.create_task(
self.consumer_handler()
)
producer_task = asyncio.create_task(
self.producer_handler()
)
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.ALL_COMPLETED
)
for task in pending:
task.cancel()

async def connect_websocket(self):
try:
self.connection = await websockets.client.connect('ws://my-server')
except ConnectionRefusedError:
sys.exit('error: cannot connect to backend')

async def consumer_handler(self):
async for message in self.connection:
await self.consumer(message)

async def consumer(self, message):
self.send_queue.put_nowait(message)
# await self.send_queue.put(message)
print('mirrored message %s now in queue, queue size is %s' % (message, self.send_queue.qsize()))

async def producer_handler(self):
while True:
message = await self.producer()
await self.connection.send(message)

async def producer(self):
result = await self.send_queue.get()
self.send_queue.task_done()
#await asyncio.sleep(10)
#result = 'test-message-2'
return result


if __name__ == '__main__':
wsc = WebSocketClient()
asyncio.run(wsc.startup())

连接效果很好。如果我从我的服务器向客户端发送一些东西,这也很好用,并在 consumer() 中打印消息。但是生产者永远不会收到我在 consumer() 中放入 send_queue 的任何消息。

之所以在consumer()中选择send_queue.put_nowait,是为了防止死锁。如果我使用 await self.send_queue.put(message) 行而不是 self.send_queue.put_nowait(message) 这行没有区别。

我想,也许队列根本不起作用,所以我在 __init__() 创建时向队列填充了一些东西: self.send_queue.put_nowait("test-message -1")。这有效并发送到我的服务器。所以队列的基本概念和 await queue.get() 起作用了。

我还想,也许生产者有问题,所以让我们在运行时随机生成消息:result = "test-message-2" 而不是 result = await self .send_queue.get()。这也有效:每 10 秒“test-message-2”发送到我的服务器。

编辑:如果我尝试将来自其他来源的东西即时添加到队列中,也会发生这种情况。我构建了一个小型异步套接字服务器,可以将任何消息推送到队列,效果很好,您可以在 consumer()< 中看到我使用 qsize() 从其他来源添加的消息,但仍然没有成功 queue.get()。所以队列本身似乎可以工作,只是 get() 不行。顺便说一句,这也是排队的原因:我想从完全不同的来源发送数据。

所以,这就是我卡住的地方。我的疯狂猜测是我在 producer() 中使用的队列与在 consumer() 中使用的队列不同,如果您使用非线程,在线程中很容易发生这种情况- 像 asyncio.Queue 这样的安全队列,但据我了解,我根本不使用线程,只是协程。那么,这里还有什么问题吗?

仅供引用:它是 docker 容器内的 Ubuntu 20.04 python 3.8.2。

谢谢,埃内斯托

最佳答案

仅供记录 - 我的问题的解决方案非常简单:我在我的 websocket 客户端创建的事件循环之外定义了 send_queue。所以它调用了 events.get_event_loop() 并获得了自己的循环 - 这不是主循环的一部分,因此从未调用过,因此 await queue.get() 真的从不得到任何返回。

在正常模式下,您看不到任何提示此问题的消息。但是,python 文档来拯救:当然他们在 https://docs.python.org/3/library/asyncio-dev.html 提到了它: logging.DEBUG 提供了我找到问题所需的提示。

关于具有队列 : asyncio. 的 Python 3.8 websocket 回显客户端 Queue get() 不会即时添加队列项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61440186/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com