gpt4 book ai didi

Scala 调度员- worker Actor 模式

转载 作者:行者123 更新时间:2023-12-04 05:20:26 28 4
gpt4 key购买 nike

我正在尝试使用标准的 scala.actors 包为 Scala 设计一个调度员- worker 角色模式。

调度员从 java.util.concurrent.LinkedBlockingQueue 接收工作并将其发送给工作人员进行处理。当所有的工作完成后,调度员应该告诉每个 worker 退出,然后它也应该退出。这是我想出的代码,但在所有工作完成后它会挂起(我认为调度程序的队列中有待处理的 'GiveMeWork 消息):

import java.util.concurrent.LinkedBlockingQueue
import scala.actors.Actor

object Dispatcher
extends Actor {
println("Dispatcher created")

def act() {
val workers = (1 to 4).map(id => (new Worker(id)).start())

loop {
react {
case 'GiveMeWork =>
// println("Worker asked for work")
val (time, i) = workQueue.take()
if (time == 0) {
println("Quitting time")
workers.foreach(_ !? 0L)
} else {
println("Arrival at dispatcher: i: " + i + " dispatch time: " +
time + ", elapsed: " + (System.nanoTime() - time))
sender ! time
}
case 'Quit =>
println("Told to quit")
sender ! 'OffDuty
exit()
}
}
}
}

class Worker(id: Int)
extends Actor {
println("Worker(" + id + ") created")
var jobs = 0

def act() {
Dispatcher ! 'GiveMeWork

loop {
react {
case time: Long =>
if (time == 0) {
println("Worker(" + id + ") completed " + jobs + " jobs")
sender ! 'OffDuty
exit()
} else {
println("Arrival at worker(" + id + "): dispatch time: " +
time + ", elapsed: " + (System.nanoTime() - time))
Thread.sleep(id)
jobs += 1
Dispatcher ! 'GiveMeWork
}
}
}
}
}

val workQueue = new LinkedBlockingQueue[(Long, Int)](1000)

Dispatcher.start()

for (i <- 0 until 5000) {
Thread.sleep(1)
workQueue.put((System.nanoTime(), i))
}

workQueue.put((0L, 0))

println("Telling Dispatcher to quit")
Dispatcher !? 'Quit

最佳答案

有一场比赛:

val (time, i) = workQueue.take()

所有的工作都完成了,包括 workQueue.put((0L, 0)) ,所以它将永远等待。

同时使用不同类型的并发是一个坏主意。

Dispatcher 可以通知任务源任务限制:
import scala.actors.{Actor, OutputChannel}
import scala.collection.mutable.Queue

case class Task(time: Long, i: Int)
case object GiveMeWork
case object Quit
case object OffDuty

object Dispatcher extends Actor {
println("Dispatcher created")

def act() {
val workers = (1 to 4).map(id => (new Worker(id)).start())
val waitingWorkers = Queue[OutputChannel[Any]](workers: _*)
val tasks = Queue[Task]()
var workSender: Option[OutputChannel[Any]] = None

loop {
react {
case GiveMeWork =>
if (!tasks.isEmpty) sender ! tasks.dequeue()
else waitingWorkers enqueue sender

workSender map { _ ! GiveMeWork }
workSender = None
case t: Task =>
if (!waitingWorkers.isEmpty) waitingWorkers.dequeue() ! t
else tasks enqueue t

if (tasks.length < 1000) sender ! GiveMeWork
else workSender = Some(sender)
case Quit =>
println("Told to quit")
workers.foreach{ _ ! Quit }
sender ! OffDuty
exit()
}
}
}
}

class Worker(id: Int)
extends Actor {
var jobs = 0

def act() {
loop {
react {
case t: Task =>
Thread.sleep(id)
jobs += 1
Dispatcher ! GiveMeWork
case Quit =>
println("Worker(" + id + ") completed " + jobs + " jobs")
sender ! OffDuty
exit()
}
}
}
}

Dispatcher.start()

for (i <- 0 until 5000) {
Thread.sleep(1)
Dispatcher !? Task(System.nanoTime(), i)
}

println("Telling Dispatcher to quit")
Dispatcher !? Quit

关于Scala 调度员- worker Actor 模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13744480/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com