gpt4 book ai didi

scala - Akka 和并发 Actor 执行

转载 作者:行者123 更新时间:2023-12-02 18:20:48 27 4
gpt4 key购买 nike

我有一个 Actor(称为 Worker),它向另外 3 个 Actor(称为 Filter1、Filter2、Filter3)发送相同的消息

每个过滤器都有一个随机时间来解决此操作。然后,在 Worker actor 中,我使用询问模式并等待 future 的成功:

class Worker2 extends Actor with ActorLogging {

val filter1 = context.actorOf(Props[Filter1], "filter1")
val filter2 = context.actorOf(Props[Filter2], "filter2")
val filter3 = context.actorOf(Props[Filter3], "filter3")

implicit val timeout = Timeout(100.seconds)

def receive = {
case Work(t) =>

val futureF3 = (filter3 ? Work(false)).mapTo[Response]
val futureF2 = (filter2 ? Work(true)).mapTo[Response]
val futureF1 = (filter1 ? Work(true)).mapTo[Response]

val aggResult: Future[Boolean] =
for {
f3 <- futureF3
f2 <- futureF2
f1 <- futureF1
} yield f1.reponse && f2.reponse && f3.reponse

if (Await.result(aggResult, timeout.duration)) {
log.info("Response: true")
sender ! Response(true)
} else {
log.info("Response: false")
sender ! Response(false)
}
}
}

如果任何过滤器参与者返回 false,那么我不需要其他答案。例如,如果我并行运行 3 个 Filter Actor,如果在一种情况下 Filter1 响应为 false,则工作已解决,我不需要 Filter2 和 Filter3 的答案。

在这段代码中,我总是需要等待 3 次执行才能决定,这似乎没有必要。有没有办法设置短路?

最佳答案

解决这个问题的方法是使用 Future.find() -- Scaladoc Here

你可以这样解决:

val failed = Future.find([f1,f2,f3]) { res => !res }
Await.result(failed, timeout.duration) match {
None => // Success
_ => // Failed
}

Future.find() 将返回第一个完成并匹配谓词的 future。如果所有 future 都已完成并且没有结果与谓词匹配,则返回 None。

编辑:

更好的解决方案是防止全部阻塞,并在找到响应时使用 akka 管道功能将结果直接传送给发送者。这样你就不会阻塞使用这个角色的线程:

import akka.pattern.pipe

val failed = Future.find([f1,f2,f3]) { res => !res }
val senderRef = sender
failed.map(res => Response(res.getOrElse(true))).pipeTo(senderRef)

在 getOrElse(true) 部分,如果我们像以前一样找到 future ,则结果为 false,否则返回 true。

关于scala - Akka 和并发 Actor 执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17863501/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com