- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
TL; DR
有没有办法等待多个期货,并以给定的顺序完成,从而从中获利?
很长的故事
假设您有两个数据源。一个给您id -> name
映射,另一个给您id -> age
映射。您要计算(name, age) -> number_of_ids_with_that_name_and_age
。
有太多数据无法直接加载,但是两个数据源都支持通过id
进行分页/迭代和排序。
所以你写类似
def iterate_names():
for page in get_name_page_numbers():
yield from iterate_name_page(page) # yields (id, name) pairs
iterate_names()
和iterate_ages()
。asyncio.gather
发送所有请求并等待所有数据,但是然后:asyncio.as_completed
,它允许您在获取结果时发送所有请求并处理页面,但是您将使页面混乱,因此您将无法进行处理。最佳答案
您的问题中发生了很多事情。我会尝试所有这些。
有没有办法等待多个期货,并以给定的顺序完成,从而从中获利?
是。您的代码可以按顺序yield from
或await
任何数量的期货。如果您专门谈论Task
,并且希望这些任务同时执行,则只需将它们分配给循环(在asyncio.ensure_future()
或loop.create_task()
时完成),然后循环就需要运行。
至于按顺序产生它们,您可以在创建任务时首先确定该顺序。在一个简单的示例中,在您开始处理所有任务/功能之前,已经创建了所有任务/功能,可以使用list
存储任务的将来,并最终从列表中拉出:
loop = asyncio.get_event_loop()
tasks_im_waiting_for = []
for thing in things_to_get:
task = loop.create_task(get_a_thing_coroutine(thing))
tasks_im_waiting_for.append(task)
@asyncio.coroutine
def process_gotten_things(getter_tasks):
for task in getter_tasks:
result = yield from task
print("We got {}".format(result))
loop.run_until_complete(process_gotten_things(tasks_im_waiting_for))
deque
而不是
list
,其中不止一个
process_gotten_things
任务
.pop()
从
deque
。如果我们想变得更高级,可以执行
as Vincent suggests in a comment to your question并使用
asyncio.Queue
代替
deque
。使用这样的队列,您可以让生产者将任务添加到与任务处理使用者同时运行的队列中。
deque
或
Queue
排序期货以进行处理有一个缺点,那就是,与运行处理器任务时一样,您只能同时处理多个期货。您可以在每次排队要处理的新将来时创建一个新的处理器任务,但是此时,此队列成为完全冗余的数据结构,因为
asyncio已经为您提供了一个类似于队列的对象,其中添加的所有内容均得到处理并发:事件循环。 对于我们计划的每个任务,我们还可以计划其处理。修改以上示例:
for thing in things_to_get:
getter_task = loop.create_task(get_a_thing_coroutine(thing))
processor_task = loop.create_task(process_gotten_thing(getter_task))
# Tasks are futures; the processor can await the result once started
for thing in things_to_get:
task = loop.create_task(get_a_thing_coroutine(thing, loop))
@asyncio.coroutine
def get_a_thing_coroutine(thing, loop):
results = yield from long_time_database_call(thing)
subtasks = []
for result in results:
subtasks.append(loop.create_task(process_result(result)))
# With subtasks scheduled in the order we like, wait for them
# to finish before we consider THIS task complete.
yield from asyncio.wait(subtasks)
asyncio.Queue
中提取收益。
loop.create_task()
明确地安排了任务。尽管
asyncio.gather()
和
asyncio.wait()
会很高兴地将协程对象并将它们作为
Task
进行调度/包装,但是在撰写本文时,它们在以可预测的顺序进行调度方面存在问题。
See asyncio issue #432。
id
结合在一起。我提到的获取和处理这些东西的模式并不能解决这样的问题,我也不知道完美的模式是什么。我将尽我所能尝试这种方法。
# defaultdicts are great for representing knowledge that an interested
# party might want whether or not we have any knowledge to begin with:
from collections import defaultdict
# Let's start with a place to store our end goal:
name_and_age_to_id_count = defaultdict(int)
# Given we're correlating info from two sources, let's make two places to
# store that info, keyed by what we're joining on: id
# When we join correlate this info, only one side might be known, so use a
# Future on both sides to represent data we may or may not have yet.
id_to_age_future = defaultdict(loop.create_future)
id_to_name_future = defaultdict(loop.create_future)
# As soon as we learn the name or age for an id, we can begin processing
# the joint information, but because this information is coming from
# multiple sources we want to process concurrently we need to keep track
# of what ids we've started processing the joint info for.
ids_scheduled_for_processing = set()
@asyncio.coroutine
def process_name_page(page_number):
subtasks = []
for id, name in iterate_name_page(page_number):
name_future = id_to_name_future[id]
name_future.set_result(name)
if id not in ids_scheduled_for_processing:
age_future = id_to_age_future[id]
task = loop.create_task(increment_name_age_pair(id, name_future, age_future))
subtasks.append(task)
ids_scheduled_for_processing.add(id)
yield from asyncio.wait(subtasks)
@asyncio.coroutine
def process_age_page(page_number):
subtasks = []
for id, age in iterate_age_page(page_number):
age_future = id_to_age_future[id]
age_future.set_result(age)
if id not in ids_scheduled_for_processing:
name_future = id_to_name_future[id]
task = loop.create_task(increment_name_age_pair(id, name_future, age_future))
subtasks.append(task)
ids_scheduled_for_processing.add(id)
yield from asyncio.wait(subtasks)
@asyncio.coroutine
def increment_name_age_pair(id, name_future, age_future):
# This will wait until both futures are resolved and let other tasks work in the meantime:
pair = ((yield from name_future), (yield from age_future))
name_and_age_to_id_count[pair] += 1
# If memory is a concern:
ids_scheduled_for_processing.discard(id)
del id_to_age_future[id]
del id_to_name_future[id]
page_processing_tasks = []
# Interleave name and age pages:
for name_page_number, age_page_number in zip_longest(
get_name_page_numbers(),
get_age_page_numbers()
):
# Explicitly schedule it as a task in the order we want because gather
# and wait have non-deterministic scheduling order:
if name_page_number is not None:
page_processing_tasks.append(loop.create_task(process_name_page(name_page_number)))
if age_page_number is not None:
page_processing_tasks.append(loop.create_task(process_age_page(age_page_number)))
loop.run_until_complete(asyncio.wait(page_processing_tasks))
print(name_and_age_to_id_count)
asyncio
可能无法解决您的所有并行处理难题。您提到了“处理”要迭代的每个页面要花很长时间。如果由于要等待服务器的响应而花了很多时间,那么此架构是一种精巧的轻量级方法,可满足您的需求(只需确保使用异步循环感知工具来完成I / O操作)。
loop.run_in_executor
与Python解释器进程池一起使用。您还可以使用带有进程池的
concurrent.futures
library而不是使用asyncio来开发解决方案。
yield from
将生成委派给内部生成器。碰巧的是,异步协程使用相同的表达式等待将来的结果,并告诉循环它可以在需要时运行其他协程的代码。
关于python - Python asyncio:按顺序完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35699601/
我正在我的一个项目中使用 aiohttp 并想限制每秒发出的请求数。我正在使用 asyncio.Semaphore 来做到这一点。我的挑战是我可能想要增加/减少每秒允许的请求数。 例如: limit
如何混合 async with api.open() as o: ... 和 o = await api.open() 在一个功能中? 自从第一次需要带有 __aenter__ 的对象以来和
有 2 个工作:“wash_clothes”(job1) 和“setup_cleaning_robot”(job2),每个工作需要你 7 和 3 秒,你必须做到世界末日。 这是我的代码: import
我们有一种设置线程名称的方法:thread = threading.Thread(name='Very important thread', target=foo),然后在格式化程序中使用 %(thr
我有一些代码,用于抓取 URL、解析信息,然后使用 SQLAlchemy 将其放入数据库中。我尝试异步执行此操作,同时限制同时请求的最大数量。 这是我的代码: async def get_url(ai
1>Python Asyncio 未使用 asyncio.run_coroutine_threadsafe 运行新的协程。下面是在Mac上进行的代码测试。 ——————————————————————
asyncio.gather和 asyncio.wait似乎有类似的用途:我有一堆我想要执行/等待的异步事情(不一定要在下一个开始之前等待一个完成)。它们使用不同的语法,并且在某些细节上有所不同,但对
我正在尝试使用 asyncio 运行以下程序: import asyncio async def main(): print('Hello') await asyncio.sleep(
我正在尝试在事件循环之外使用协程函数。 (在这种情况下,我想在 Django 中调用一个也可以在事件循环中使用的函数) 如果不使调用函数成为协程,似乎没有办法做到这一点。 我意识到 Django 是为
我有一个假设 asyncio.gather设想: await asyncio.gather( cor1, [cor2, cor3], cor4, ) 我要 cor2和 cor3
我有多个服务器,每个服务器都是 asyncio.start_server 返回的实例。我需要我的 web_server 与 websockets 一起使用,以便能够使用我的 javascript 客户
我正在使用 Python 3 asyncio 框架评估定期执行的不同模式(为简洁起见省略了实际 sleep /延迟),我有两段代码表现不同,我无法解释原因。第一个版本使用 yield from 递归调
从事件线程外部将协程推送到事件线程的 pythonic 方法是什么? 最佳答案 更新信息: 从Python 3.7 高级函数asyncio.create_task(coro)开始was added并且
我有一个大型 (1M) 数据库结果集,我想为其每一行调用一个 REST API。 API 可以接受批处理请求,但我不确定如何分割 rows 生成器,以便每个任务处理一个行列表,比如 10。我宁愿不预先
迷失在异步中。 我同时在学习Kivy和asyncio,卡在了解决运行Kivy和运行asyncio循环的问题上,无论怎么转,都是阻塞调用,需要顺序执行(好吧,我希望我是错的),例如 loop = asy
我有这个 3.6 异步代码: async def send(command,userPath,token): async with websockets.connect('wss://127.
首先,我需要警告你:我是 asyncio 的新手,而且我是 我马上警告你,我是 asyncio 的新手,我很难想象引擎盖下的库里有什么。 这是我的代码: import asyncio semaphor
我有一个asyncio.PriorityQueue,用作网络爬虫的URL队列,当我调用url_queue.get时,得分最低的URL首先从队列中删除()。当队列达到 maxsize 项时,默认行为是阻
探索 Python 3.4.0 的 asyncio 模块,我试图创建一个类,其中包含从类外部的 event_loop 调用的 asyncio.coroutine 方法。 我的工作代码如下。 impor
我有一个可能是无用的问题,但尽管如此,我还是觉得我错过了一些对于理解 asyncio 的工作方式可能很重要的东西。 我刚刚开始熟悉 asyncio 并编写了这段非常基本的代码: import asyn
我是一名优秀的程序员,十分优秀!