gpt4 book ai didi

python - 惯用地从 future 指令中收集结果

转载 作者:太空狗 更新时间:2023-10-29 18:12:40 25 4
gpt4 key购买 nike

我正在尝试编写尽可能地道的东西,以从存储在字典中的 future 中收集结果。

假设我有以下代码:

import asyncio

async def sleep(seconds):
print(f'sleeping for {seconds} seconds')
await asyncio.sleep(seconds)
print(f'finished sleeping {seconds} seconds')


async def run():
tasks = {
'4': sleep(4),
'3': sleep(3),
'2': sleep(2),
'1': sleep(1),
}
print(await gather_from_dict(tasks))


if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(run())

我期望的输出是:

sleeping for 2 seconds
sleeping for 1 seconds
sleeping for 4 seconds
sleeping for 3 seconds
finished sleeping 1 seconds
finished sleeping 2 seconds
finished sleeping 3 seconds
finished sleeping 4 seconds
{'4': None, '3': None, '2': None, '1': None}

到目前为止,我找到的最干净的解决方案是:

async def gather_from_dict(tasks:Dict[Hashable, Awaitable],
loop=None, return_exceptions=False) -> Dict:

results = await asyncio.gather(
*tasks.values(),
loop=loop,
return_exceptions=return_exceptions
)
return dict(zip(tasks.keys(), results))

关于如何以更简单的方式执行此操作的任何想法?谢谢!!!

最佳答案

我重新定义了你的任务,让它更像协程列表,并且更喜欢从 run_until_complete 方法中获取结果,代码如下,并注意到我在你的 sleep 代码中返回了一些东西,在你的代码中,你实际上返回 None .

import asyncio


async def sleep(seconds):
print('sleeping for {seconds} seconds'.format(seconds=seconds))
await asyncio.sleep(seconds)
print('finished sleeping {seconds} seconds'.format(seconds=seconds))
return {seconds: 'value of {seconds}'.format(seconds=seconds)}


if __name__ == '__main__':

loop = asyncio.get_event_loop()

tasks = [sleep(i) for i in range(1, 5)]

finished, _ = loop.run_until_complete(
asyncio.wait(tasks))

result = {}

for task in finished:
result.update(task.result())

print(result)
loop.close()

关于python - 惯用地从 future 指令中收集结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48151386/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com