gpt4 book ai didi

scala - akka actor 中的增量处理

转载 作者:行者123 更新时间:2023-12-04 13:33:25 24 4
gpt4 key购买 nike

我有需要执行非常长时间运行且计算成本高的工作的 Actor ,但计算本身可以增量完成。因此,虽然完整的计算本身需要数小时才能完成,但中间结果实际上非常有用,我希望能够响应它们的任何请求。这是我想要做的伪代码:

var intermediateResult = ...
loop {
while (mailbox.isEmpty && computationNotFinished)
intermediateResult = computationStep(intermediateResult)


receive {
case GetCurrentResult => sender ! intermediateResult
...other messages...
}
}

最佳答案

做到这一点的最佳方法与您已经在做的事情非常接近:

case class Continue(todo: ToDo)
class Worker extends Actor {
var state: IntermediateState = _
def receive = {
case Work(x) =>
val (next, todo) = calc(state, x)
state = next
self ! Continue(todo)
case Continue(todo) if todo.isEmpty => // done
case Continue(todo) =>
val (next, rest) = calc(state, todo)
state = next
self ! Continue(rest)
}
def calc(state: IntermediateState, todo: ToDo): (IntermediateState, ToDo)
}

编辑:更多背景

当actor向自己发送消息时,Akka的内部处理基本上会在 while中运行这些消息。环形;一次处理的消息数由actor的调度员的 throughput决定设置(默认为 5),在此处理量之后,线程将返回到池中,并且延续将作为新任务排入调度程序。因此,上述解决方案中有两个可调参数:
  • 处理单个消息的多个步骤(如果处理步骤非常小)
  • 增加throughput增加吞吐量和降低公平性的设置

  • 最初的问题似乎有数百个这样的角色在运行,大概是在没有数百个 CPU 的通用硬件上,因此吞吐量设置可能应该设置为每个批次的时间不超过大约。 10 毫秒。

    绩效评估

    让我们玩一下斐波那契数列:
    Welcome to Scala version 2.10.0-RC1 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_07).
    Type in expressions to have them evaluated.
    Type :help for more information.

    scala> def fib(x1: BigInt, x2: BigInt, steps: Int): (BigInt, BigInt) = if(steps>0) fib(x2, x1+x2, steps-1) else (x1, x2)
    fib: (x1: BigInt, x2: BigInt, steps: Int)(BigInt, BigInt)

    scala> def time(code: =>Unit) { val start = System.currentTimeMillis; code; println("code took " + (System.currentTimeMillis - start) + "ms") }
    time: (code: => Unit)Unit

    scala> time(fib(1, 1, 1000))
    code took 1ms

    scala> time(fib(1, 1, 1000))
    code took 1ms

    scala> time(fib(1, 1, 10000))
    code took 5ms

    scala> time(fib(1, 1, 100000))
    code took 455ms

    scala> time(fib(1, 1, 1000000))
    code took 17172ms

    这意味着在一个大概非常优化的循环中, fib_100000 需要半秒。现在让我们和 Actor 一起玩一下:
    scala> case class Cont(steps: Int, batch: Int)
    defined class Cont

    scala> val me = inbox()
    me: akka.actor.ActorDSL.Inbox = akka.actor.dsl.Inbox$Inbox@32c0fe13

    scala> val a = actor(new Act {
    var s: (BigInt, BigInt) = _
    become {
    case Cont(x, y) if y < 0 => s = (1, 1); self ! Cont(x, -y)
    case Cont(x, y) if x > 0 => s = fib(s._1, s._2, y); self ! Cont(x - 1, y)
    case _: Cont => me.receiver ! s
    }
    })
    a: akka.actor.ActorRef = Actor[akka://repl/user/$c]

    scala> time{a ! Cont(1000, -1); me.receive(10 seconds)}
    code took 4ms

    scala> time{a ! Cont(10000, -1); me.receive(10 seconds)}
    code took 27ms

    scala> time{a ! Cont(100000, -1); me.receive(10 seconds)}
    code took 632ms

    scala> time{a ! Cont(1000000, -1); me.receive(30 seconds)}
    code took 17936ms

    这已经很有趣了:如果每一步有足够长的时间(最后一行的幕后有巨大的 BigInts), Actor 就没有太多额外的了。现在让我们看看当以更批量的方式进行较小的计算时会发生什么:
    scala> time{a ! Cont(10000, -10); me.receive(30 seconds)}
    code took 462ms

    这与上述直接变体的结果非常接近。

    结论

    对于几乎所有应用程序来说,向 self 发送消息并不昂贵,只需将实际处理步骤保持略大于几百纳秒即可。

    关于scala - akka actor 中的增量处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12851996/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com