gpt4 book ai didi

python - 在 Dask 中,是否有一种方法可以在依赖项可用时对其进行处理,例如 multiprocessing.imap_unordered?

转载 作者:行者123 更新时间:2023-12-04 10:03:00 26 4
gpt4 key购买 nike

我有一个简单的图结构,它接受 N 个独立的任务,然后聚合它们。我不在乎独立任务的结果以什么顺序聚合。有没有一种方法可以通过在依赖项可用时对其进行操作来加快计算速度?

考虑以下示例。在其中,并行任务每个都等待一些随机时间,然后返回。另一个任务收集结果,形成一个有序队列。如果收集异步发生,则顺序将基于任务完成的时间。如果收集同步发生,那么顺序将由输入静态定义。

from multiprocessing import Pool
from dask import delayed
import numpy as np
from time import sleep

def wait(i):
"""Something embarrassingly parallel"""
np.random.seed()
t = np.random.uniform()
sleep(t)
print(i, t)
return i, t

def lineup(who_when):
"""Aggregate"""
order = []
for who, when in who_when:
print(f'who: {who}')
order.append(who)
return order

使用 imap_unordered,我们看到收集/减少在所有依赖项完成之前尽快开始。
n = 5
pool = Pool(processes=n)
lineup(pool.imap_unordered(wait, range(n)))

# Produces something like the following

2 0.2837069069881948
4 0.44156753704276597
who: 2
who: 4
1 0.5563172244950703
0 0.6696008076879393
who: 1
who: 0
3 0.9911326214345308
who: 3
[2, 4, 1, 0, 3]

使用 dask.delayed,按照我习惯的方式,结果类似于 map(),一旦所有依赖项都可用,就会开始收集。顺序是静态的。
n = 5
order = delayed(lineup)([delayed(wait)(i) for i in range(n)])
order.compute()

# produces something like:

0 0.2792789023871932
2 0.44570072028850705
4 0.6969597596416385
1 0.766705306208266
3 0.9889956337687371
who: 0
who: 1
who: 2
who: 3
who: 4
[0, 1, 2, 3, 4]

dask 中是否有 imap_unordered 等价物?也许使用 dask.bag 的东西?

最佳答案

是的。您可能正在寻找 as_completed Dask Futures interface的功能.

Handling Evolving Workflows 上有一个 Dask 示例

为方便起见,我将在此处复制 as_completed 的文档字符串

已完成

按照完成的顺序返回 future

这将返回一个迭代器,该迭代器按照它们完成的顺序生成输入的 future 对象。无论顺序如何,在迭代器上调用 next 都会阻塞,直到下一个 future 完成。

此外,您还可以在计算过程中使用 .add 方法向该对象添加更多 future

参数

future : future 的集合
按完成顺序迭代的 Future 对象列表

with_results: bool (False)
是否等待并包括 future 结果;在这种情况下 as_completed 产生一个 (future, result) 的元组

raise_errors: bool (True)
当 future 的结果引发异常时,我们是否应该引发;仅在 with_results=True 时影响行为。

例子

>>> x, y, z = client.map(inc, [1, 2, 3])  # doctest: +SKIP
>>> for future in as_completed([x, y, z]): # doctest: +SKIP
... print(future.result()) # doctest: +SKIP
3
2
4

在计算过程中添加更多 future

>>> x, y, z = client.map(inc, [1, 2, 3])  # doctest: +SKIP
>>> ac = as_completed([x, y, z]) # doctest: +SKIP
>>> for future in ac: # doctest: +SKIP
... print(future.result()) # doctest: +SKIP
... if random.random() < 0.5: # doctest: +SKIP
... ac.add(c.submit(double, future)) # doctest: +SKIP
4
2
8
3
6
12
24

可选择等待直到结果也已收集

>>> ac = as_completed([x, y, z], with_results=True)  # doctest: +SKIP
>>> for future, result in ac: # doctest: +SKIP
... print(result) # doctest: +SKIP
2
4
3

关于python - 在 Dask 中,是否有一种方法可以在依赖项可用时对其进行处理,例如 multiprocessing.imap_unordered?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61742048/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com