gpt4 book ai didi

python - 异步 : collecting results from an async function in an executor

转载 作者:太空宇宙 更新时间:2023-11-03 13:57:47 25 4
gpt4 key购买 nike

我想启动大量 HTTP 请求并在所有请求返回后收集它们的结果。使用 asyncio 可以以非阻塞方式发送请求,但我在收集它们的结果时遇到了问题。

我知道诸如 aiohttp 之类的解决方案是针对这个特定问题而制作的。但是 HTTP 请求只是一个例子,我的问题是如何正确使用 asyncio

在服务器端,我有一个 flask,它用“Hello World!”回答对 localhost/ 的每个请求,但它在回答之前等待 0.1 秒。在我的所有示例中,我发送了 10 个请求。同步代码大约需要 1 秒,异步版本可以在 0.1 秒内完成。

在客户端,我想同时启动许多请求并收集它们的结果。我试图以三种不同的方式做到这一点。由于 asyncio 需要执行程序来绕过阻塞代码,因此所有方法都调用 loop.run_in_executor

此代码在他们之间共享:

import requests
from time import perf_counter
import asyncio

loop = asyncio.get_event_loop()

async def request_async():
r = requests.get("http://127.0.0.1:5000/")
return r.text

def request_sync():
r = requests.get("http://127.0.0.1:5000/")
return r.text

方法一:

对任务列表使用 asyncio.gather(),然后使用 run_until_complete。看完Asyncio.gather vs asyncio.wait ,似乎 gather 会等待结果。但事实并非如此。所以这段代码立即返回,无需等待请求完成。如果我在这里使用阻塞函数,这就有效。为什么我不能使用异步函数?

# approach 1
start = perf_counter()
tasks = []
for i in range(10):
tasks.append(loop.run_in_executor(None, request_async)) # <---- using async function !

gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)
stop = perf_counter()
print(f"finished {stop - start}") # 0.003

# approach 1(B)
start = perf_counter()
tasks = []
for i in range(10):
tasks.append(loop.run_in_executor(None, request_sync)) # <---- using sync function

gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)

stop = perf_counter()
print(f"finished {stop - start}") # 0.112

Python 甚至警告我从未等待 coroutine "request_async"。在这一点上,我有一个可行的解决方案:在执行程序中使用普通(非异步)函数。但我想要一个适用于 async 函数定义的解决方案。因为我想在其中使用 await(在这个简单的示例中没有必要,但是如果我将更多代码移动到 asyncio,我相信它会变得很重要).

方法 2:

Python 警告我永远不会等待我的协程。所以让我们等待他们。方法 2 将所有代码包装到外部异步函数中并等待收集的结果。同样的问题,也立即返回(同样的警告):

# approach 2
async def main():

tasks = []
for i in range(10):
tasks.append(loop.run_in_executor(None, request_async))

gathered_tasks = asyncio.gather(*tasks)

return await gathered_tasks # <-------- here I'm waiting on the coroutine

start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()
print(f"finished {stop - start}") # 0.0036

这让我很困惑。我正在等待 gather 的结果。直觉上应该传播到我正在收集的协程。但是 python 仍然提示我的协程从未等待。

我又读了一些,发现:How could I use requests in asyncio?

这几乎就是我的示例:组合 requestsasyncio。这让我想到了方法 3:

方法 3:

与方法 2 相同的结构,但是分别等待分配给 run_in_executor() 的每个任务(当然这算作等待协程):

# approach 3:
# wrapping executor in coroutine
# awaiting every task individually
async def main():

tasks = []
for i in range(10):
task = loop.run_in_executor(None, request_async)
tasks.append(task)

responses = []
for task in tasks:
response = await task
responses.append(response)

return responses

start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()

print(f"finished {stop - start}") # 0.004578

我的问题是:我想在我的协同程序中包含阻塞代码并与执行程序并行运行它们。我如何获得他们的结果?

最佳答案

My question is: I want to have blocking code in my coroutines and run them in parallel with an executor. How do I get their results ?

答案是您不应该在协程中包含阻塞代码。如果必须拥有它,则必须使用 run_in_executor 将其隔离。所以使用 requests 编写 request_async 的正确方法是:

async def request_async():
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, request_sync)

request_async 传递给 run_in_executor 没有意义,因为 run_in_executor 的整个 point 是调用一个sync 在不同的线程中运行。如果你给它一个协程函数,它会很高兴地调用它(在另一个线程中)并提供返回的协程对象作为“结果”。这就像将生成器传递给需要普通函数的代码 - 是的,它会很好地调用生成器,但它不知道如何处理返回的对象。

一般来说,您不能只将 async 放在 def 前面并期望获得可用的协程。协程不得阻塞,除非等待其他异步代码。

一旦有了可用的request_async,就可以像这样收集它的结果:

async def main():
coros = [request_async() for _i in range(10)]
results = await asyncio.gather(*coros)
return results

results = loop.run_until_complete(main())

关于python - 异步 : collecting results from an async function in an executor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53263596/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com