- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我想启动大量 HTTP 请求并在所有请求返回后收集它们的结果。使用 asyncio
可以以非阻塞方式发送请求,但我在收集它们的结果时遇到了问题。
我知道诸如 aiohttp 之类的解决方案是针对这个特定问题而制作的。但是 HTTP 请求只是一个例子,我的问题是如何正确使用 asyncio
。
在服务器端,我有一个 flask,它用“Hello World!”回答对 localhost/
的每个请求,但它在回答之前等待 0.1 秒。在我的所有示例中,我发送了 10 个请求。同步代码大约需要 1 秒,异步版本可以在 0.1 秒内完成。
在客户端,我想同时启动许多请求并收集它们的结果。我试图以三种不同的方式做到这一点。由于 asyncio 需要执行程序来绕过阻塞代码,因此所有方法都调用 loop.run_in_executor
。
此代码在他们之间共享:
import requests
from time import perf_counter
import asyncio
loop = asyncio.get_event_loop()
async def request_async():
r = requests.get("http://127.0.0.1:5000/")
return r.text
def request_sync():
r = requests.get("http://127.0.0.1:5000/")
return r.text
方法一:
对任务列表使用 asyncio.gather()
,然后使用 run_until_complete
。看完Asyncio.gather vs asyncio.wait ,似乎 gather 会等待结果。但事实并非如此。所以这段代码立即返回,无需等待请求完成。如果我在这里使用阻塞函数,这就有效。为什么我不能使用异步函数?
# approach 1
start = perf_counter()
tasks = []
for i in range(10):
tasks.append(loop.run_in_executor(None, request_async)) # <---- using async function !
gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)
stop = perf_counter()
print(f"finished {stop - start}") # 0.003
# approach 1(B)
start = perf_counter()
tasks = []
for i in range(10):
tasks.append(loop.run_in_executor(None, request_sync)) # <---- using sync function
gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)
stop = perf_counter()
print(f"finished {stop - start}") # 0.112
Python 甚至警告我从未等待 coroutine "request_async"
。在这一点上,我有一个可行的解决方案:在执行程序中使用普通(非异步)函数。但我想要一个适用于 async
函数定义的解决方案。因为我想在其中使用 await
(在这个简单的示例中没有必要,但是如果我将更多代码移动到 asyncio
,我相信它会变得很重要).
方法 2:
Python 警告我永远不会等待我的协程。所以让我们等待他们。方法 2 将所有代码包装到外部异步函数中并等待收集的结果。同样的问题,也立即返回(同样的警告):
# approach 2
async def main():
tasks = []
for i in range(10):
tasks.append(loop.run_in_executor(None, request_async))
gathered_tasks = asyncio.gather(*tasks)
return await gathered_tasks # <-------- here I'm waiting on the coroutine
start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()
print(f"finished {stop - start}") # 0.0036
这让我很困惑。我正在等待 gather
的结果。直觉上应该传播到我正在收集的协程。但是 python 仍然提示我的协程从未等待。
我又读了一些,发现:How could I use requests in asyncio?
这几乎就是我的示例:组合 requests
和 asyncio
。这让我想到了方法 3:
方法 3:
与方法 2 相同的结构,但是分别等待分配给 run_in_executor()
的每个任务(当然这算作等待协程):
# approach 3:
# wrapping executor in coroutine
# awaiting every task individually
async def main():
tasks = []
for i in range(10):
task = loop.run_in_executor(None, request_async)
tasks.append(task)
responses = []
for task in tasks:
response = await task
responses.append(response)
return responses
start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()
print(f"finished {stop - start}") # 0.004578
我的问题是:我想在我的协同程序中包含阻塞代码并与执行程序并行运行它们。我如何获得他们的结果?
最佳答案
My question is: I want to have blocking code in my coroutines and run them in parallel with an executor. How do I get their results ?
答案是您不应该在协程中包含阻塞代码。如果必须拥有它,则必须使用 run_in_executor
将其隔离。所以使用 requests
编写 request_async
的正确方法是:
async def request_async():
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, request_sync)
将 request_async
传递给 run_in_executor
没有意义,因为 run_in_executor
的整个 point 是调用一个sync 在不同的线程中运行。如果你给它一个协程函数,它会很高兴地调用它(在另一个线程中)并提供返回的协程对象作为“结果”。这就像将生成器传递给需要普通函数的代码 - 是的,它会很好地调用生成器,但它不知道如何处理返回的对象。
一般来说,您不能只将 async
放在 def
前面并期望获得可用的协程。协程不得阻塞,除非等待其他异步代码。
一旦有了可用的request_async
,就可以像这样收集它的结果:
async def main():
coros = [request_async() for _i in range(10)]
results = await asyncio.gather(*coros)
return results
results = loop.run_until_complete(main())
关于python - 异步 : collecting results from an async function in an executor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53263596/
是否有带有索引的.collect?我想做这样的事情: def myList = [ [position: 0, name: 'Bob'], [position: 0, name: 'J
我创建了一个 Collection 类,它扩展了 ArrayList 以添加一些有用的方法。它看起来像这样: public class Collection extends ArrayList {
我知道如果我有元素,我想得到 List/Set/Map 我可以调用这个元素: Collections.singleton()/Collections.singletonList()/Collectio
我刚刚在我的 pom 文件中看到 Apache commons-collections 有两个不同的组 ID: commons-collections commons-collect
我们可以对所有 Collections 类型的对象(如 Set 和 List)使用 Collections.synchronizedCollection(Collection c),这就是为什么我们有
我有List>我想让它把上一个集合中的所有人复制到List收藏。 我是这样做的: var People = new List>{ new List{...},... };
我想做的是使用良好的旧循环非常简单。 假设我有一个包含 B 列表的对象 A。 public class A { public List myListOfB; } 在其他一些方法中,我有一个 As
在 Capgemini 的采访中,我被问到一个我无法回答的问题。所有集合类和接口(interface)共有的那些方法是什么? 最佳答案 所有 java 对象类(包括所有集合)都派生自名为 Object
我有一系列存储估计信息的数据库表。当设置某些边界时,我试图从所有数据库表中返回所有数据。 收藏 $estimateItems = new Collection(); $esti
为什么 Haskell 实现如此专注于链表? 例如,我知道 Data.Sequence 效率更高 大多数列表操作(cons 操作除外),并且被大量使用; 但是,从语法上讲,它“几乎不受支持”。 Has
我试图简单地将我在 PHP 中请求的内容返回到 JSON。我的问题是每个库存尚未完成。事实上,它是“渲染”,但“this.collection.models”尚未完成,因为请求尚未完成。 我应该如何解
本质上,作为Powershell脚本的一部分,我需要实现广度优先搜索。因此,我需要队列,并且认为System.Collections.Queue与其他任何队列一样好。但是,当我从队列中取出一个对象时,
已关闭。这个问题是 off-topic 。目前不接受答案。 想要改进这个问题吗? Update the question所以它是 on-topic用于堆栈溢出。 已关闭10 年前。 Improve t
嗨,我不明白为什么这不起作用? Notifications.update({'userId':Meteor.userId(), 'notifyUserId':notifyFriendId}, {$se
假设我有一个闭包: def increment = {value, step -> value + step } 现在我想遍历我的整数集合的每个项目,用 5 递增,并将新元素保存到一个新集合中:
使用逐页 View 时,我的 plone 集合文件夹未显示所有项目。基本上我有 9 页包含元素,但第 6 - 8 页显示的内容完全相同。因此,并非所有项目都会显示,即使项目总数对应于应该在集合中的元素
private Map> map ,其中 ProgramCourse 是我的项目中的域类,上面的 map 是我运行项目时域类 Program 的字段以下异常即将到来。 Use of @OneToMan
三者的主要区别是什么?现在,我想分别使用字符串/字符串创建一个键/值对。这三个似乎都有我可以使用的选项。 编辑:我只想创建一个简单的哈希表 - 没什么特别复杂的。 最佳答案 通用集合几乎完全取代了基础
我正在为 NodeJs 使用 mongodb 驱动程序,其中有 3 个方法: 1) db.collection.insert 2) 数据库.collection.insertOne 3) db.col
我有一个集合,我正在尝试使用 Distinct 方法删除重复项。 public static Collection imagePlaylist imagePlaylist = imagePlaylis
我是一名优秀的程序员,十分优秀!