gpt4 book ai didi

multithreading - 提高 scala .par 操作的并行度

转载 作者:行者123 更新时间:2023-12-03 12:54:06 26 4
gpt4 key购买 nike

当我调用 par在集合上,它似乎创建了大约 5-10 个线程,这对于 CPU 密集型任务来说很好。

但有时我的任务是 IO 绑定(bind)的,在这种情况下,我希望同时从 IO 拉出 500-1000 个线程 - 执行 10-15 个线程非常慢,而且我看到我的 CPU 大多处于空闲状态。

我怎样才能做到这一点?

最佳答案

您可以将阻塞 io 操作包装在 blocking 中堵塞:

(0 to 1000).par.map{ i =>
blocking {
Thread.sleep(100)
Thread.activeCount()
}
}.max // yield 67 on my pc, while without blocking it's 10

但是你应该问自己一个问题,是否应该为 IO 操作使用并行集合。他们的用例是执行 CPU 繁重的任务。

我建议您考虑使用 future 进行 IO 调用。

您还应该考虑为该任务使用自定义执行上下文,因为全局执行上下文是公共(public)单例,您无法控制使用它的代码和用途。如果您使用外部库中的所有线程,您很容易使由外部库创建的并行计算饿死。
// or just use scala.concurrent.ExecutionContext.Implicits.global if you don't care
implicit val blockingIoEc: ExecutionContextExecutor = ExecutionContext.fromExecutor(
Executors.newCachedThreadPool()
)

def fetchData(index: Int): Future[Int] = Future {
//if you use global ec, then it's required to mark computation as blocking to increase threads,
//if you use custom cached thread pool it should increase thread number even without it
blocking {
Thread.sleep(100)
Thread.activeCount()
}
}

val futures = (0 to 1000).map(fetchData)

Future.sequence(futures).onComplete {
case Success(data) => println(data.max) //prints about 1000 on my pc
}

Thread.sleep(1000)

编辑

也可以使用自定义 ForkJoinPool 使用 ForkJoinTaskSupport :
import java.util.concurrent.ForkJoinPool //scala.concurrent.forkjoin.ForkJoinPool is deprecated
import scala.util.Random
import scala.collection.parallel

val fjpool = new ForkJoinPool(2)
val customTaskSupport = new parallel.ForkJoinTaskSupport(fjpool)

val numbers = List(1,2,3,4,5).par

numbers.tasksupport = customTaskSupport //assign customTaskSupport

关于multithreading - 提高 scala .par 操作的并行度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56541362/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com