- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
阅读 this answer ,我遇到了 asyncio.tasks.as_completed。我不明白该功能实际上是如何工作的。它被记录为一个非异步例程,按照它们完成的顺序返回 future 。它创建一个与事件循环关联的队列,为每个 future 添加一个完成回调,然后尝试从队列中获取与 future 一样多的项目。
核心代码如下:
def _on_completion(f):
if not todo:
return # _on_timeout() was here first.
todo.remove(f)
done.put_nowait(f)
if not todo and timeout_handle is not None:
timeout_handle.cancel()
@coroutine
def _wait_for_one():
f = yield from done.get()
if f is None:
# Dummy value from _on_timeout().
raise futures.TimeoutError
return f.result() # May raise f.exception().
for f in todo:
f.add_done_callback(_on_completion)
if todo and timeout is not None:
timeout_handle = loop.call_later(timeout, _on_timeout)
for _ in range(len(todo)):
yield _wait_for_one()
我想了解这段代码的工作原理。我最大的问题是:
循环实际在哪里运行。我没有看到任何对 loop.run_until_cobmplete 或 loop.run_forever 的调用。那么循环是如何进行的呢?
方法文档说该方法返回 futures。你可以这样调用它
对于 as_completed(futures) 中的 f: 结果 = f 的产量
我无法将其与 _wait_for_one 中的返回 f.result 行进行协调。记录的调用约定是否正确?如果是这样,那么 yield 从何而来?
最佳答案
您复制的代码缺少 header 部分,这非常重要。
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
def as_completed(fs, *, loop=None, timeout=None):
"""Return an iterator whose values are coroutines.
When waiting for the yielded coroutines you'll get the results (or
exceptions!) of the original Futures (or coroutines), in the order
in which and as soon as they complete.
This differs from PEP 3148; the proper way to use this is:
for f in as_completed(fs):
result = yield from f # The 'yield from' may raise.
# Use result.
If a timeout is specified, the 'yield from' will raise
TimeoutError when the timeout occurs before all Futures are done.
Note: The futures 'f' are not necessarily members of fs.
"""
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
loop = loop if loop is not None else events.get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
from .queues import Queue # Import here to avoid circular import problem.
done = Queue(loop=loop)
timeout_handle = None
def _on_timeout():
for f in todo:
f.remove_done_callback(_on_completion)
done.put_nowait(None) # Queue a dummy value for _wait_for_one().
todo.clear() # Can't do todo.remove(f) in the loop.
def _on_completion(f):
if not todo:
return # _on_timeout() was here first.
todo.remove(f)
done.put_nowait(f)
if not todo and timeout_handle is not None:
timeout_handle.cancel()
@coroutine
def _wait_for_one():
f = yield from done.get()
if f is None:
# Dummy value from _on_timeout().
raise futures.TimeoutError
return f.result() # May raise f.exception().
for f in todo:
f.add_done_callback(_on_completion)
if todo and timeout is not None:
timeout_handle = loop.call_later(timeout, _on_timeout)
for _ in range(len(todo)):
yield _wait_for_one()
[循环实际上在哪里运行?]
为了简单起见,假设超时设置为无。
as_completed 期望可迭代的 future ,而不是协程。所以这个 future 已经绑定(bind)到循环并计划执行。换句话说,这些 future 是 loop.create_task 或 asyncio.ensure_futures 的输出(没有明确写明)。所以循环已经在“运行”它们,当它们完成时,它们 future 的 .done() 方法将返回 True。
然后创建“完成”队列。请注意,“完成”队列是 asyncio.queue 的一个实例,即实现阻塞方法(.get、.put)»使用循环« 的队列。
通过“todo = { ...”这一行,每个协程的 future (即 fs 的一个元素)都被包裹在另一个 future »绑定(bind)到循环«,并且最后一个 future 的 done_callback 被设置为调用 _on_completion 函数.
_on_completion 函数将在循环完成协程的执行时调用,协程的 future 在“fs”中传递给 as_completed 函数。
_on_completion 函数从待办事项集中删除“我们的 future ”并将其结果(即 future 在“fs”集中的协程)放入完成队列。换句话说,as_completed 函数所做的就是将这些 futures 附加到 done_callback,以便将原始 futures 的结果移入 done 队列。
然后,对于 len(fs) == len(todo) 次,as_completed 函数会生成一个协程,该协程会阻止“yield from done.get()”,等待 _on_completed(或 _on_timeout)函数放入结果进入 done 队列。
由 as_completed 调用者执行的“yield from”将等待结果出现在完成队列中。
[ yield 从何而来?]
这是因为 todo 是一个 asyncio.queue,因此您可以(asyncio-)阻塞直到队列中有一个值 .put()。
关于python - asyncio.as_completed 如何工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44049719/
阅读 this answer ,我遇到了 asyncio.tasks.as_completed。我不明白该功能实际上是如何工作的。它被记录为一个非异步例程,按照它们完成的顺序返回 future 。它创
我正在 Python 2.7 模块 multiprocessing 中寻找 as_completed 函数(来自 Python 3 concurrent.futures)的模拟。我目前的解决方案: i
我正在尝试从异步队列中提取任务,并在发生异常时调用给定的错误处理程序。排队项目以字典形式给出(由 enqueue_task 排队),其中包含任务、可能的错误处理程序以及错误处理程序可能需要的任何 ar
我正在学习 python 并发性,并且向我介绍了 future 的概念。我读到 as_completed() 获取可迭代的 futures 并在完成时产生它们。 我想知道它在内部是如何工作的。它是否立
我有一些代码,用于抓取 URL、解析信息,然后使用 SQLAlchemy 将其放入数据库中。我尝试异步执行此操作,同时限制同时请求的最大数量。 这是我的代码: async def get_url(ai
来自 asyncio docs : asyncio.as_completed(aws, *, loop=None, timeout=None) Run awaitable objects in the
我正在努力在Python中使用concurrent.futures。我正在尝试迭代大量 S3 对象。由于帐户、存储桶和对象的数量较多,这可能需要很长时间。比我的 STS 凭证有效的时间长,并且足够长,
所以,基本上,在 Python 3.7 中(据我所知)如果你尝试这样做, import asyncio async def sleep(): asyncio.sleep(1) async de
我正在尝试同时发送 HTTP 请求。为此,我使用 concurrent.futures 这是简单的代码: import requests from concurrent import futures
我在学习concurrent.futures.ThreadPoolExecutor在 Py3.6 中,对于使用之间的区别、优缺点有点困惑 1 future.add_done_callback(call
我是一名优秀的程序员,十分优秀!