gpt4 book ai didi

python asyncio 在 key 可用时通过 key 从字典中异步获取数据

转载 作者:太空宇宙 更新时间:2023-11-03 10:47:54 32 4
gpt4 key购买 nike

如标题所示,我的用例是这样的:

我有一个 aiohttp 服务器,它接受来自客户端的请求,当我收到请求时,我为它生成一个唯一的请求 ID,然后我发送 {req_id: req_pyaload}给一些 worker 的命令( worker 不在 python 中,因此在另一个进程中运行),当 worker 完成工作时,我得到响应并将它们放入这样的结果字典中:{req_id_1: res_1, req_id_2: res_2} .

然后我希望我的 aiohttp 服务器处理程序为 await在上面result dict ,因此当特定响应可用时(通过 req_id),它可以将其发回。

我构建了下面的示例代码来尝试模拟该过程,但在执行协程时卡住了 async def fetch_correct_res(req_id)这应该异步/无阻塞通过req_id获取正确的响应.

import random
import asyncio
import shortuuid

n_tests = 1000

idxs = list(range(n_tests))

req_ids = []
for _ in range(n_tests):
req_ids.append(shortuuid.uuid())

res_dict = {}

async def fetch_correct_res(req_id):
pass

async def handler(req):
res = await fetch_correct_res(req)
assert req == res, "the correct res for the req should exactly be the req itself."
print("got correct res for req: {}".format(req))

async def randomly_put_res_to_res_dict():
for _ in range(n_tests):
random_idx = random.choice(idxs)
await asyncio.sleep(random_idx / 1000)
res_dict[req_ids[random_idx]] = req_ids[random_idx]
print("req: {} is back".format(req_ids[random_idx]))

所以:

  1. 这个解决方案是否可行?怎么办?

  2. 如果上述解决方案不可行,对于这个使用 asyncio 的用例,正确的解决方案应该是什么?

非常感谢。


目前我能想到的唯一方法是:预先创建一些 asyncio.Queue使用预先分配的 id,然后为每个传入请求分配一个队列,因此处理程序只是 await在这个队列上,当响应返回时,我只将它放入这个预先分配的队列中,在请求完成后,我收集回队列以用于下一个传入请求。不是很优雅,但会解决问题。

最佳答案

看看下面的示例实现是否满足您的需求

基本上你想用你的响应(无法预测顺序)以异步方式响应请求(id)

因此在请求处理时,用 {request_id: {'event':<async.Event>, 'result': <result>}} 填充字典和 awaitasyncio.Event.wait() , 一旦收到响应,用 asyncio.Event.set() 发出事件信号这将释放等待,然后根据请求 ID 从字典中获取响应

我稍微修改了您的代码以使用请求 ID 预填充字典并放入 awaitasyncio.Event.wait()直到信号来自响应

import random
import asyncio
import shortuuid

n_tests = 10

idxs = list(range(n_tests))

req_ids = []
for _ in range(n_tests):
req_ids.append(shortuuid.uuid())

res_dict = {}

async def fetch_correct_res(req_id, event):
await event.wait()
res = res_dict[req_id]['result']
return res

async def handler(req, loop):
print("incoming request id: {}".format(req))
event = asyncio.Event()
data = {req :{}}
res_dict.update(data)
res_dict[req]['event']=event
res_dict[req]['result']='pending'
res = await fetch_correct_res(req, event)
assert req == res, "the correct res for the req should exactly be the req itself."
print("got correct res for req: {}".format(req))

async def randomly_put_res_to_res_dict():
random.shuffle(req_ids)
for i in req_ids:
await asyncio.sleep(random.randrange(2,4))
print("req: {} is back".format(i))
if res_dict.get(i) is not None:
event = res_dict[i]['event']
res_dict[i]['result'] = i
event.set()

loop = asyncio.get_event_loop()
tasks = asyncio.gather(handler(req_ids[0], loop),
handler(req_ids[1], loop),
handler(req_ids[2], loop),
handler(req_ids[3], loop),
randomly_put_res_to_res_dict())
loop.run_until_complete(tasks)
loop.close()

上述代码的示例响应

incoming request id: NDhvBPqMiRbteFD5WqiLFE
incoming request id: fpmk8yC3iQcgHAJBKqe2zh
incoming request id: M7eX7qeVQfWCCBnP4FbRtK
incoming request id: v2hAfcCEhRPUDUjCabk45N
req: VeyvAEX7YGgRZDHqa2UGYc is back
req: M7eX7qeVQfWCCBnP4FbRtK is back
got correct res for req: M7eX7qeVQfWCCBnP4FbRtK
req: pVvYoyAzvK8VYaHfrFA9SB is back
req: soP8NDxeQKYjgeT7pa3wtG is back
req: j3rcg5Lp59pQXuvdjCAyZe is back
req: NDhvBPqMiRbteFD5WqiLFE is back
got correct res for req: NDhvBPqMiRbteFD5WqiLFE
req: v2hAfcCEhRPUDUjCabk45N is back
got correct res for req: v2hAfcCEhRPUDUjCabk45N
req: porzHqMqV8SAuttteHRwNL is back
req: trVVqZrUpsW3tfjQajJfb7 is back
req: fpmk8yC3iQcgHAJBKqe2zh is back
got correct res for req: fpmk8yC3iQcgHAJBKqe2zh

关于python asyncio 在 key 可用时通过 key 从字典中异步获取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57821695/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com