gpt4 book ai didi

python - 有没有办法在多线程中使用 asyncio.Queue?

转载 作者:太空狗 更新时间:2023-10-29 17:37:41 25 4
gpt4 key购买 nike

假设我有以下代码:

import asyncio
import threading

queue = asyncio.Queue()

def threaded():
import time
while True:
time.sleep(2)
queue.put_nowait(time.time())
print(queue.qsize())

@asyncio.coroutine
def async():
while True:
time = yield from queue.get()
print(time)

loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()

此代码的问题在于 async 协同程序中的循环永远不会完成第一次迭代,而 queue 大小正在增加。

为什么会这样,我该如何解决?

我无法摆脱单独的线程,因为在我的实际代码中,我使用单独的线程与串行设备通信,而且我还没有找到使用 asyncio 来做到这一点的方法.

最佳答案

asyncio.Queue is not thread-safe ,所以你不能直接从多个线程使用它。相反,您可以使用 janus ,这是一个第三方库,提供线程感知的 asyncio 队列:

import asyncio
import threading
import janus

def threaded(squeue):
import time
while True:
time.sleep(2)
squeue.put_nowait(time.time())
print(squeue.qsize())

@asyncio.coroutine
def async(aqueue):
while True:
time = yield from aqueue.get()
print(time)

loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
asyncio.Task(asyncio.ensure_future(queue.async_q))
threading.Thread(target=threaded, args=(queue.sync_q,)).start()
loop.run_forever()

还有aioprocessing (完全公开:我写的),它也提供进程安全(并且作为副作用,线程安全)队列,但如果您不尝试使用 multiprocessing 那就太过分了.

编辑

正如其他答案中指出的那样,对于简单的用例,您可以使用 loop.call_soon_threadsafe也添加到队列中。

关于python - 有没有办法在多线程中使用 asyncio.Queue?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32889527/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com