gpt4 book ai didi

scala - 使用 Scala Actor 执行 CPU 密集型任务?

转载 作者:行者123 更新时间:2023-12-04 10:56:53 33 4
gpt4 key购买 nike

假设我必须执行几个 CPU 密集型任务。例如,如果我有 4 个 CPU,我可能会创建一个由 4-5 个工作线程组成的固定大小的线程池,在队列中等待并将任务放入队列中。在 Java 中,我可以使用 java.util.concurrent (也许 ThreadPoolExecutor )来实现这个机制。

您将如何使用 Scala Actor 实现它?

最佳答案

所有参与者基本上都是由调度程序在后台执行的线程。调度程序创建一个线程池来执行大致绑定(bind)到您的核心数量的参与者。这意味着您可以为每个需要执行的任务创建一个参与者,并将其余的留给 Scala:

for(i <- 1 to 20) {
actor {
print(i);
Thread.sleep(1000);
}
}

这里的缺点是取决于任务的数量,为每个任务创建线程的成本可能非常昂贵,因为线程在 Java 中并不便宜。

创建一个有界的工作角色池,然后通过消息传递将任务分配给他们的简单方法如下:
import scala.actors.Actor._

val numWorkers = 4
val pool = (1 to numWorkers).map { i =>
actor {
loop {
react {
case x: String => println(x)
}
}
}
}

for(i <- 1 to 20) {
val r = (new util.Random).nextInt(numWorkers)
pool(r) ! "task "+i
}

我们要创建多个参与者的原因是因为单个参与者一次只处理一条消息(即任务),因此要为您的任务获得并行性,您需要创建多个。

附注:当涉及到 I/O 绑定(bind)任务时,默认调度程序变得特别重要,因为在这种情况下您肯定会想要更改线程池的大小。两个很好的博客文章详细介绍了这一点: Explore the Scheduling of Scala ActorsScala actors thread pool pitfall .

话虽如此, Akka是一个 Actor 框架,它为更高级的 Actor 工作流提供了工具,它是我将在任何实际应用程序中使用的工具。这是一个负载平衡(而不是随机)任务执​​行器:
import akka.actor.Actor
import Actor._
import akka.routing.{LoadBalancer, CyclicIterator}

class TaskHandler extends Actor {
def receive = {
case t: Task =>
// some computationally expensive thing
t.execute
case _ => println("default case is required in Akka...")
}
}

class TaskRouter(numWorkers: Int) extends Actor with LoadBalancer {
val workerPool = Vector.fill(numWorkers)(actorOf[TaskHandler].start())
val seq = new CyclicIterator(workerPool)
}

val router = actorOf(new TaskRouter(4)).start()

for(i <- 1 to 20) {
router ! Task(..)
}

您可以拥有不同类型的负载平衡(CyclicIterator 是循环分发),因此您可以查看文档 here了解更多信息。

关于scala - 使用 Scala Actor 执行 CPU 密集型任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6466878/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com