gpt4 book ai didi

scala - 从线程模型到参与者模型的转变

转载 作者:行者123 更新时间:2023-12-02 18:09:50 34 4
gpt4 key购买 nike

尝试掌握如何从参与者而不是线程的角度进行思考。我对以下用例有点困惑:

Consider a system that has a producer process that creates work (e.g. by reading data from a file), and a number of worker processes that consume the work (e.g. by parsing the data and writing it to a database). The rates at which work is produced and consumed can vary, and the system should remain robust to this. For example, if the workers can't keep up, the producer should detect this and eventually slow down or wait.

这很容易用线程实现:

val producer:Iterator[Work] = createProducer()
val queue = new LinkedBlockingQueue[Work](QUEUE_SIZE)
val workers = (0 until NUM_WORKERS) map { i =>
new Thread() {
override def run() = {
while (true) {
try {
// take next unit of work, waiting if necessary
val work = queue.take()
process(work)
}
catch {
case e:InterruptedException => return
}
}
}
}
}

// start the workers
workers.foreach(_.start())

while (producer.hasNext) {
val work = producer.next()
// add new unit of work, waiting if necessary
queue.put(work)
}

while (!queue.isEmpty) {
// wait until queue is drained
queue.wait()
}

// stop the workers
workers.foreach(_.interrupt())

这个模型并没有什么问题,而且我以前也成功地使用过它。这个示例可能过于冗长,因为使用 Executor 或 CompletionService 非常适合此任务。但我喜欢 Actor 抽象,并且认为在很多情况下它更容易推理。有没有办法使用参与者重写这个示例,特别是确保不存在缓冲区溢出(例如邮箱已满、消息丢失等)?

最佳答案

因为参与者“离线”处理消息(即消息的消费与消息的接收无关),所以很难看出如何精确模拟“生产者等待消费者 catch ”。

我唯一能想到的是消费者向生产者参与者请求工作(使用reply):

case object MoreWorkPlease
class Consumer(prod : Producer) extends Actor {
def act = {
prod ! MoreWorkPlease
loop {
react {
case Work(payload) => doStuff(payload); reply(MoreWorkPlease)
}
}
}
}

class Producer extends Actor {
def act = loop {
react {
case MoreWorkPlease => reply(Work(getNextItem))
}
}
}

当然,这并不完美,因为生产者不会“向前读取”,并且仅在消费者准备好时才开始工作。用法如下:

val prod = new Producer
(1 to NUM_ACTORS).map(new Consumer(prod)).foreach(_.start())
prod.start()

关于scala - 从线程模型到参与者模型的转变,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4159340/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com