gpt4 book ai didi

python-3.x - 有效使用多个 Asyncio 队列

转载 作者:行者123 更新时间:2023-12-04 10:34:09 25 4
gpt4 key购买 nike

我目前正在构建一个需要对各种端点发出多个请求的项目。我将这些请求包装在 Aiohttp 中以允许异步。

问题:
我有三个队列:queue1 , queue2 , 和 queue3 .此外,我有三个与其各自的队列相关联的工作函数( worker1worker2worker3 )。第一个队列立即使用运行前已知的列表 ID 填充。当请求完成并将数据提交到数据库时,它会将 ID 传递给 queue2 .一个 worker2将使用此 ID 并请求更多数据。根据这些数据,它将开始生成 ID 列表(不同于 queue1/queue2 中的 ID。worker2 会将 ID 放入 queue3。最后 worker3 将从 queue3 中获取此 ID 并请求更多数据在提交到数据库之前。

问题在于 queue.join() 的事实。是阻塞调用。每个工作人员都绑定(bind)到一个单独的队列,因此 queue1 的连接将阻塞直到完成。这很好,但也违背了使用异步的目的。不使用 join()当队列完全为空时,程序无法检测到。另一个问题是,当其中一个队列为空但仍有数据尚未添加时,可能会出现静默错误。

基本代码大纲如下:

queue1 = asyncio.Queue()
queue2 = asyncio.Queue()
queue3 = asyncio.Queue()

async with aiohttp.ClientSession() as session:
for i in range(3):
tasks.append(asyncio.create_task(worker1(queue1)))

for i in range(3):
tasks.append(asyncio.create_task(worker2(queue2)))

for i in range(10):
tasks.append(asyncio.create_task(worker3(queue3)))

for i in IDs:
queue1.put_nowait(i)

await asyncio.gather(*tasks)

工作函数处于无限循环中,等待项目进入队列。

当数据全部处理完毕后,将不会退出,程序将挂起。

有没有办法有效管理 worker 并妥善结束?

最佳答案

正如 this answer 中很好解释的那样, Queue.join用于在注入(inject)队列的所有工作完成时通知生产者。由于您的第一个队列不知道特定项目何时完成(它被相乘并分发到其他队列),join不是适合您的工具。
从您的代码来看,您的工作人员似乎只需要运行处理队列的初始项目所需的时间。如果是这种情况,那么您可以使用关闭哨兵信号通知工作人员退出。例如:

async with aiohttp.ClientSession() as session:

# ... create tasks as above ...

for i in IDs:
queue1.put_nowait(i)
queue1.put_nowait(None) # no more work

await asyncio.gather(*tasks)
这就像您的原始代码,但有一个明确的关闭请求。 worker 必须检测到哨兵并做出相应的 react :将其传播到下一个队列/ worker 并退出。例如,在 worker1 :
while True:
item = queue1.get()
if item is None:
# done with processing, propagate sentinel to worker2 and exit
await queue2.put(None)
break
# ... process item as usual ...
在其他两个工作人员中执行相同操作( worker3 除外,因为没有下一个队列而不会传播)将导致所有三个任务在工作完成后完成。由于队列是先进先出的,因此工作人员可以在遇到哨兵后安全退出,因为他们知道没有元素被丢弃。显式关闭还将关闭队列与恰好为空的队列区分开来,从而防止工作人员因队列暂时为空而过早退出。
直到 Python 3.7,这种技术实际上是 demonstratedQueue 的文档中,但该示例有点令人困惑地显示了 Queue.join 的使用。以及使用关闭哨兵。两者是分开的,可以相互独立使用。 (同时使用它们也可能有意义,例如使用 Queue.join 等待“里程碑”,然后将其他内容放入队列中,同时保留哨兵以停止工作人员。)

关于python-3.x - 有效使用多个 Asyncio 队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60266397/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com