gpt4 book ai didi

Scala 并行无序迭代器

转载 作者:行者123 更新时间:2023-12-02 00:11:55 24 4
gpt4 key购买 nike

我有一个 Iterable 需要执行的“工作单元”,没有特定的顺序,并且可以很容易地并行运行而不会相互干扰。

不幸的是,一次运行太多它们会超出我的可用 RAM,因此我需要确保在任何给定时间只有少数几个同时运行。

最基本的,我想要一个这种类型签名的函数:

parMap[A, B](xs: Iterator[A], f: A => B, chunkSize: Int): Iterator[B]

这样输出 Iterator 不一定与输入的顺序相同(如果我想知道结果来自哪里,我可以输出一对输入或其他东西。 ) 然后,消费者可以增量地使用生成的迭代器,而不会耗尽机器的所有内存,同时为该任务保持尽可能多的并行性。

此外,我希望函数尽可能高效。例如,我最初的想法是按照以下几行做一些事情:

xs.iterator.grouped(chunkSize).flatMap(_.toSet.par.map(f).iterator)

我希望 toSet 会通知 Scala 的并行集合,它可以在元素准备就绪后立即开始从其迭代器中以任何顺序生成元素,并且 grouped 调用是为了限制同时工作的人数。不幸的是,它看起来不像 toSet 调用达到了预期的效果(结果返回的顺序与没有 par 调用的顺序相同,在我的experiments,) 并且 grouped 调用不是最优的。例如,如果我们的组大小为 100,其中 99 个作业立即在十几个核心上完成,但其中一个特别慢,其余大部分核心将处于空闲状态,直到我们可以移动到下一个组。如果有一个最多与我的 block 大小一样大的“自适应窗口”,但又不会被慢速工作人员阻止,那就更干净了。

我可以设想自己使用工作窃取 (de) 队列或类似的东西编写类似的东西,但我想我已经在某种程度上完成了处理并发原语的大量艰苦工作在 Scala 的并行集合库中。有谁知道我可以重用它的哪些部分来构建这个功能,或者对如何实现这样的操作有其他建议?

最佳答案

并行集合框架允许您指定用于给定任务的最大线程数。使用 scala-2.10,你会想要做:

def parMap[A,B](x : Iterable[A], f : A => B, chunkSize : Int) = {
val px = x.par
px.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(chunkSize))
px map f
}

这将防止在任何时候运行超过 chunkSize 的操作。这在底层使用了工作窃取策略来保持 Actor 的工作,因此不会遇到与上面的 grouped 示例相同的问题。

但是,这样做不会将结果重新排序为先完成的顺序。为此,我建议将您的操作变成一个 Actor ,并让一个小型 Actor 池运行这些操作,然后在它们完成时将结果发回给您。

关于Scala 并行无序迭代器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14731852/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com