- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在尝试在 iPyParallel 中将多个任务链接在一起,例如
import ipyparallel
client = ipyparallel.Client()
view = client.load_balanced_view()
def task1(x):
## Do some work.
return x * 2
def task2(x):
## Do some work.
return x * 3
def task3(x):
## Do some work.
return x * 4
results1 = view.map_async(task1, [1, 2, 3])
results2 = view.map_async(task2, results1.get())
results3 = view.map_async(task3, results2.get())
但是,此代码不会提交任何任务 2,除非任务 1 已完成并且基本上处于阻塞状态。我的任务可能需要不同的时间,而且效率很低。 有没有一种简单的方法可以有效地链接这些步骤,并且引擎可以从前面的步骤中获取结果?像这样:
def task2(x):
## Do some work.
return x.get() * 3 ## Get AsyncResult out.
def task3(x):
## Do some work.
return x.get() * 4 ## Get AsyncResult out.
results1 = [view.apply_async(task1, x) for x in [1, 2, 3]]
results2 = []
for x in result1:
view.set_flags(after=x.msg_ids)
results2.append(view.apply_async(task2, x))
results3 = []
for x in result2:
view.set_flags(after=x.msg_ids)
results3.append(view.apply_async(task3, x))
显然,这将失败,因为 AsyncResult 不可选取。
我在考虑几个解决方案:
使用 view.map_async(ordered=False)。
results1 = view.map_async(task1, [1, 2, 3], ordered=False)
for x in results1:
results2.append(view.apply_async(task2, x.get()))
但是这必须等待所有任务 1 完成才能提交任何任务 3。还在阻塞。
使用异步。
@asyncio.coroutine
def submitter(x):
result1 = yield from asyncio.wrap_future(view.apply_async(task1, x))
result2 = yield from asyncio.wrap_future(view.apply_async(task2, result1)
result3 = yield from asyncio.wrap_future(view.apply_async(task3, result2)
yield result3
@asyncio.coroutine
def submit_all(ls):
jobs = [submitter(x) for x in ls]
results = []
for async_r in asyncio.as_completed(jobs):
r = yield from async_r
results.append(r)
## Do some work, like analysing results.
它可以正常工作,但是当引入更复杂的任务时,代码很快就会变得困惑和不直观。
感谢您的帮助。
最佳答案
IPython parallel 在这方面并不是最好的,因为连接必须在客户端级别完成。在提交结果之前,您必须等待结果完成并返回给客户端。本质上,您的 asyncio submit_all 是为 IPython 并行执行此操作的正确方法。您可以通过编写一个 chain
函数来获得更通用的东西,该函数使用 add_done_callback
在前一个任务完成时提交新任务:
from concurrent.futures import Future
from functools import partial
def chain_apply(view, func, future):
"""Chain a call to view.apply(func, future.result()) when future is ready.
Returns a Future for the subsequent result.
"""
f2 = Future()
# when f1 is ready, submit a new task for func on its result
def apply_func(f):
if f.exception():
f2.set_exception(f.exception())
return
print('submitting %s(%s)' % (func.__name__, f.result()))
ar = view.apply_async(func, f.result())
# when ar is done, pass through the result to f2
ar.add_done_callback(lambda ar: f2.set_result(ar.get()))
future.add_done_callback(apply_func)
return f2
def chain_map(view, func, list_of_futures):
"""Chain a new callback on a list of futures."""
return [ chain_apply(view, func, f) for f in list_of_futures ]
# use builtin map with apply, since we want one Future per item
results1 = map(partial(view.apply, task1), [1, 2, 3])
results2 = chain_map(view, task2, results1)
results3 = chain_map(view, task3, results2)
print("Waiting for results")
[ r.result() for r in results3 ]
与 add_done_callback
的任何示例一样,它可以用协程编写,但我发现这种情况下的回调很好。这至少应该是一个相当通用的实用程序,您可以使用它来编写您的管道。
全面披露:我是 IPython Parallel 的主要作者,即将建议您使用其他工具。
可以在 IPython 中通过引擎 namespace 和 DAG 依赖项将结果从一个任务传递到另一个任务,但老实说,如果您的工作流程看起来像这样,您应该考虑使用 dask distributed ,专门为这种计算图设计的。如果您已经习惯并熟悉 IPython parallel,那么开始使用 dask 应该不会是一个太大的负担。
IPython 5.1 提供了一个方便的命令,可将您的 IPython 并行集群转变为 dask 分布式集群:
import ipyparallel as ipp
client = ipp.Client()
executor = client.become_distributed(ncores=1)
然后 dask 的关键相关特性是您可以将 futures 作为参数提交给后续的 map 调用,当结果准备好时调度程序会处理它,而不必在客户端中显式地执行:
results1 = executor.map(task1, [1, 2, 3])
results2 = executor.map(task2, results1)
results3 = executor.map(task3, results2)
executor.gather(results3)
所以基本上,当您需要像这样链接事物时,dask distributed 会按照您希望 IPython parallel 的负载平衡工作的方式工作。
This notebook说明了这两个例子。
关于python - 如何有效地链接 ipyparallel 任务并将中间结果传递给引擎?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37998484/
我试图了解如何使用 ipyparallel 和 jupyter 笔记本进行分布式处理,所以我做了一些测试并得到了奇怪的结果。 from ipyparallel import Client %px im
我有一个循环,每次迭代都会执行大量 CPU 密集型计算并将结果附加到列表中。 我怎样才能让它并行工作。在C#上有并发容器,它在ipyparallel中如何工作? 来自 ipyparallel 文档:
我正在尝试在 iPyParallel 中将多个任务链接在一起,例如 import ipyparallel client = ipyparallel.Client() view = client.loa
我在不同参数的循环中运行相同的模拟。每个模拟都使用一个 pandas DataFrame (data),它只被读取,从不被修改。使用 ipyparallel(IPython 并行),我可以在模拟开始之
我在一个 ipython notebook 中运行一个模拟,它由七个相互依赖的函数组成,需要 13 个不同的参数。一些函数在其他函数中被调用,以允许一个函数运行整个模拟。模拟涉及操作两个参数,总共 >
Jupyter 或 ipyparallel 下散布着文档,但没有一个单独的文档说明从头到尾的整个过程。我真的很困惑。有没有人可以分享一下经验? 这是关于我的环境的检查。 $ipcluster --ve
我使用 ipyparallel 的映射函数(如 Jupyter Notebook 中的函数)运行了相当耗时(>3 天)的模拟 from ipyparallel import Client rc = C
说到ipyparallel,是否可以指定多个ipengines在从机上同时启动,如果可以,我该怎么做? 例如,可以使用ipcluster start -n命令指定要在本地主机上启动的多个引擎,即$ i
我正在努力了解如何使用 ipython/ipyparallel 设置分布式 MPI 集群。我没有很强的 MPI 背景。 我已按照 ipyparallel 文档中的以下说明进行操作 (Using ipc
我正在尝试使用 ipyparallel 训练多个 RandomForest 分类器。我的设计是嵌套的 CV 循环,外部 CV 用于移除方差,内部 GridSearchCV 内置 (n_jobs = -
我有以下几点: ipyparallel (5.0.0) ipython (4.0.3) 我通过在命令行中键入来启用 ipcluster: ipcluster nbextension enable 我正
我发现很难弄清楚如何使用 jupyter 实验室的 ipyparallel 并行执行两个函数。有人可以给我一个应该如何完成的例子吗?例如同时运行这两个函数: import time def foo()
我发现很难弄清楚如何使用 jupyter 实验室的 ipyparallel 并行执行两个函数。有人可以给我一个应该如何完成的例子吗?例如同时运行这两个函数: import time def foo()
我刚开始使用 ipyparallel,我正在使用 VS2017 并将其导入为; import ipyparallel as ipp 然后尝试使用; def main(): rc = ipp.C
是否有任何理由将 Ipyparallel 用于常见的 python 脚本(不是 ipython 笔记本)? 最佳答案 您可能会选择 IPython parallel 的原因可能与您相关,也可能不相关:
我是一名优秀的程序员,十分优秀!