gpt4 book ai didi

akka - 在 Akka 中等待多个结果

转载 作者:行者123 更新时间:2023-12-04 11:06:47 29 4
gpt4 key购买 nike

在Akka中等待多个actor的结果的正确方法是什么?

Principles of Reactive Programming Coursera 类(class)有一个带有复制键值存储的练习。无需深入研究分配的细节,它需要等待多个参与者的确认,然后才能表明复制已完成。

我使用包含未完成请求的可变映射实现了分配,但我觉得该解决方案有一种“难闻的味道”。我希望有更好的方法来实现看似常见的场景。

为了通过保留我对练习的解决方案来维护类(class)的荣誉准则,我有一个描述类似问题的抽象用例。

发票行项目需要计算其纳税义务。纳税义务是跨多个税务机关(例如,联邦、州、警区)应用于该行项目的所有税款的组合。如果每个税务机关都是能够确定行项目纳税义务的参与者,则行项目需要所有参与者进行报告,然后才能继续报告总体纳税义务。在 Akka 中完成此场景的最佳/正确方法是什么?

最佳答案

这是我相信您正在寻找的简化示例。它展示了像 Actor 这样的主人如何产生一些子 worker ,然后等待他们的所有响应,处理可能发生超时等待结果的情况。该解决方案展示了如何等待初始请求,然后在等待响应时切换到新的接收函数。它还展示了如何将状态传播到等待接收函数中以避免必须在实例级别具有显式可变状态。

object TaxCalculator {
sealed trait TaxType
case object StateTax extends TaxType
case object FederalTax extends TaxType
case object PoliceDistrictTax extends TaxType
val AllTaxTypes:Set[TaxType] = Set(StateTax, FederalTax, PoliceDistrictTax)

case class GetTaxAmount(grossEarnings:Double)
case class TaxResult(taxType:TaxType, amount:Double)

case class TotalTaxResult(taxAmount:Double)
case object TaxCalculationTimeout
}

class TaxCalculator extends Actor{
import TaxCalculator._
import context._
import concurrent.duration._

def receive = waitingForRequest

def waitingForRequest:Receive = {
case gta:GetTaxAmount =>
val children = AllTaxTypes map (tt => actorOf(propsFor(tt)))
children foreach (_ ! gta)
setReceiveTimeout(2 seconds)
become(waitingForResponses(sender, AllTaxTypes))
}

def waitingForResponses(respondTo:ActorRef, expectedTypes:Set[TaxType], taxes:Map[TaxType, Double] = Map.empty):Receive = {
case TaxResult(tt, amount) =>
val newTaxes = taxes ++ Map(tt -> amount)
if (newTaxes.keySet == expectedTypes){
respondTo ! TotalTaxResult(newTaxes.values.foldLeft(0.0)(_+_))
context stop self
}
else{
become(waitingForResponses(respondTo, expectedTypes, newTaxes))
}

case ReceiveTimeout =>
respondTo ! TaxCalculationTimeout
context stop self
}

def propsFor(taxType:TaxType) = taxType match{
case StateTax => Props[StateTaxCalculator]
case FederalTax => Props[FederalTaxCalculator]
case PoliceDistrictTax => Props[PoliceDistrictTaxCalculator]
}
}

trait TaxCalculatingActor extends Actor{
import TaxCalculator._
val taxType:TaxType
val percentage:Double

def receive = {
case GetTaxAmount(earnings) =>
val tax = earnings * percentage
sender ! TaxResult(taxType, tax)
}
}

class FederalTaxCalculator extends TaxCalculatingActor{
val taxType = TaxCalculator.FederalTax
val percentage = 0.20
}

class StateTaxCalculator extends TaxCalculatingActor{
val taxType = TaxCalculator.StateTax
val percentage = 0.10
}

class PoliceDistrictTaxCalculator extends TaxCalculatingActor{
val taxType = TaxCalculator.PoliceDistrictTax
val percentage = 0.05
}

然后,您可以使用以下代码对此进行测试:

import TaxCalculator._
import akka.pattern.ask
import concurrent.duration._
implicit val timeout = Timeout(5 seconds)

val system = ActorSystem("taxes")
import system._
val cal = system.actorOf(Props[TaxCalculator])
val fut = cal ? GetTaxAmount(1000.00)
fut onComplete{
case util.Success(TotalTaxResult(amount)) =>
println(s"Got tax total of $amount")
case util.Success(TaxCalculationTimeout) =>
println("Got timeout calculating tax")
case util.Failure(ex) =>
println(s"Got exception calculating tax: ${ex.getMessage}")
}

关于akka - 在 Akka 中等待多个结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22770927/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com