gpt4 book ai didi

python-3.x - 使用 asyncio 实现类似选择的功能

转载 作者:行者123 更新时间:2023-12-02 14:46:39 24 4
gpt4 key购买 nike

在尝试使用 asyncio(在 Python 3.6 上)实现类似于 select() 的功能时,我遇到了一种我不理解的行为:

  • 我有 2 个队列(AB),其中消息由异步生产者 Queue.put
  • 我有一个异步消费者(选择器),它轮询队列并获取第一条可用消息(即使用 的类似 select() 的功能) asyncio.wait_for

投票的工作方式如下:

poll = list(_.get() for _ in queues)
while True:
done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED)

然后,迭代 done future ,我用新的 Queue.get 替换 poll 中的相应条目:

    for f in done:
try:
i = poll.index(f._coro)
...
poll[i] = queues[i].get()

现在,我遇到的奇怪行为是选择有效,但一些done futures .result() 返回None ,并且通过队列发送的消息会丢失:

PUT B ('B', 0)/0
GET B ('B', 0)/0
PUT A ('A', 0)/0
GET A None/0 << ('A', 0) is never received
PUT A ('A', 1)/0
GET A ('A', 1)/0
PUT B ('B', 1)/0
GET B None/0
PUT A ('A', 2)/0
GET A None/0
PUT B ('B', 2)/0
GET B None/0

这是代码

#!/usr/bin/env python3
import asyncio, random

async def queue_generator( name, queue, speed=1 ):
counter = 0
while True:
t = (random.random() + 0.5) * speed
await asyncio.sleep(t)
m = (name, counter)
print ("PUT {0} {1}/{2:d}".format(name, m, queue.qsize()))
queue.put_nowait(m)
counter += 1

async def select( *queues ):
poll = list(_.get() for _ in queues)
while True:
# That's the select()-like functionalit
done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED)
for f in done:
i = poll.index(f._coro)
# That's where sometimes v is None
v = f.result()
print ("GET {0} {1}/{2}".format("AB"[i], v, queues[i].qsize()))
poll[i] = queues[i].get()
await asyncio.sleep(0.5)

if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue_a = asyncio.Queue(10)
queue_b = asyncio.Queue(10)
tasks = (
loop.create_task(queue_generator("A", queue_a, 3)),
loop.create_task(queue_generator("B", queue_b, 3)),
loop.create_task(select(queue_a, queue_b))
)
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

由于每当当前有消息的队列发生更改时都会发生这种情况,我认为问题在于对 pending future 进行某些操作。事实上,添加

    for f in pending:
f.cancel()

解决了这个问题,但我想尽可能地重复使用这些 future。我认为问题来自于 asyncio.wait_for 默默地将生成器列表转换为任务。

最佳答案

我没有深入研究发生的情况,但移动 poll 创建内部循环似乎可以解决问题:

async def select( *queues ):
while True:
poll = list(_.get() for _ in queues) # HERE
# That's the select()-like functionalit
done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED)

poll 是协程列表。每个独特的协程通常应该等待一次。否则可能会导致奇怪的事情。

关于python-3.x - 使用 asyncio 实现类似选择的功能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46119912/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com