gpt4 book ai didi

scala - akka:组合来自多个 child 的消息的模式

转载 作者:行者123 更新时间:2023-12-04 06:17:26 24 4
gpt4 key购买 nike

这是我遇到的模式:

一个 Actor A有多个 child C1 , ..., Cn .收到消息后,A将它发送给它的每个 child ,每个 child 都对消息进行一些计算,并在完成后将其发送回 A . A然后想把所有 child 的结果结合起来传递给另一个 Actor 。

这个问题的解决方案是什么样的?或者这是一种反模式?在这种情况下应该如何解决这个问题?

这是一个简单的例子,希望能说明我目前的解决方案。我担心的是重复代码(达到对称性);不能很好地扩展到“很多” child ;并且很难看出发生了什么。

import akka.actor.{Props, Actor}

case class Tagged[T](value: T, id: Int)

class A extends Actor {
import C1._
import C2._

val c1 = context.actorOf(Props[C1], "C1")
val c2 = context.actorOf(Props[C2], "C2")
var uid = 0
var c1Results = Map[Int, Int]()
var c2Results = Map[Int, Int]()

def receive = {
case n: Int => {
c1 ! Tagged(n, uid)
c2 ! Tagged(n, uid)
uid += 1
}
case Tagged(C1Result(n), id) => c2Results get id match {
case None => c1Results += (id -> n)
case Some(m) => {
c2Results -= id
context.parent ! (n, m)
}
}
case Tagged(C2Result(n), id) => c1Results get id match {
case None => c2Results += (id -> n)
case Some(m) => {
c1Results -= id
context.parent ! (m, n)
}
}
}
}

class C1 extends Actor {
import C1._

def receive = {
case Tagged(n: Int, id) => Tagged(C1Result(n), id)
}
}

object C1 {
case class C1Result(n: Int)
}

class C2 extends Actor {
import C2._

def receive = {
case Tagged(n: Int, id) => Tagged(C2Result(n), id)
}
}

object C2 {
case class C2Result(n: Int)
}

如果您认为代码看起来很糟糕,请放轻松,我刚刚开始学习 akka ;)

最佳答案

在许多或不同数量的 child Actor 的情况下,ask pattern Zim-Zam 的建议很快就会失控。

aggregator pattern旨在帮助解决这种情况。它提供了一个聚合器特性,您可以在参与者中使用它来执行聚合逻辑。

想要执行聚合的客户端参与者可以启动基于聚合器的参与者实例并向其发送将启动聚合过程的消息。

应该为每个聚合操作创建一个新的聚合器,并在发回结果时终止(当它收到所有响应或超时时)。

下面列出了此模式的一个示例,用于对由 Child 类表示的参与者持有的整数值求和。 (请注意,它们不需要都是由同一个父 actor 监督的子类:SummationAggregator 只需要一个 ActorRefs 的集合。)

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

import akka.actor._
import akka.contrib.pattern.Aggregator

object Child {
def props(value: Int): Props = Props(new Child(value))

case object GetValue
case class GetValueResult(value: Int)
}

class Child(value: Int) extends Actor {
import Child._

def receive = { case GetValue => sender ! GetValueResult(value) }
}

object SummationAggregator {
def props = Props(new SummationAggregator)

case object TimedOut
case class StartAggregation(targets: Seq[ActorRef])
case object BadCommand
case class AggregationResult(sum: Int)
}

class SummationAggregator extends Actor with Aggregator {
import Child._
import SummationAggregator._

expectOnce {
case StartAggregation(targets) =>
// Could do what this handler does in line but handing off to a
// separate class encapsulates the state a little more cleanly
new Handler(targets, sender())
case _ =>
sender ! BadCommand
context stop self
}

class Handler(targets: Seq[ActorRef], originalSender: ActorRef) {
// Could just store a running total and keep track of the number of responses
// that we are awaiting...
var valueResults = Set.empty[GetValueResult]

context.system.scheduler.scheduleOnce(1.second, self, TimedOut)

expect {
case TimedOut =>
// It might make sense to respond with what we have so far if some responses are still awaited...
respondIfDone(respondAnyway = true)
}

if (targets.isEmpty)
respondIfDone()
else
targets.foreach { t =>
t ! GetValue
expectOnce {
case vr: GetValueResult =>
valueResults += vr
respondIfDone()
}
}

def respondIfDone(respondAnyway: Boolean = false) = {
if (respondAnyway || valueResults.size == targets.size) {
originalSender ! AggregationResult(valueResults.foldLeft(0) { case (acc, GetValueResult(v)) => acc + v })
context stop self
}
}
}
}

要从您的父 Actor 使用此 SummationAggregator,您可以执行以下操作:
context.actorOf(SummationAggregator.props) ! StartAggregation(children)

然后在父接收的某处处理 AggregationResult。

关于scala - akka:组合来自多个 child 的消息的模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30791915/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com