gpt4 book ai didi

scala - 在失败的情况下从询问中解决 Akka future

转载 作者:行者123 更新时间:2023-12-04 03:15:22 30 4
gpt4 key购买 nike

我正在使用 Spray 应用程序中的 ask 模式调用 Actor,并将结果作为 HTTP 响应返回。我将参与者的失败映射到自定义错误代码。

val authActor = context.actorOf(Props[AuthenticationActor])

callService((authActor ? TokenAuthenticationRequest(token)).mapTo[LoggedInUser]) { user =>
complete(StatusCodes.OK, user)
}

def callService[T](f: => Future[T])(cb: T => RequestContext => Unit) = {
onComplete(f) {
case Success(value: T) => cb(value)
case Failure(ex: ServiceException) => complete(ex.statusCode, ex.errorMessage)
case e => complete(StatusCodes.InternalServerError, "Unable to complete the request. Please try again later.")
//In reality this returns a custom error object.
}
}

这在 authActor 发送失败时正常工作,但如果 authActor 抛出异常,则在询问超时完成之前不会发生任何事情。例如:
override def receive: Receive = {
case _ => throw new ServiceException(ErrorCodes.AuthenticationFailed, "No valid session was found for that token")
}

我知道 Akka 文档是这么说的

To complete the future with an exception you need send a Failure message to the sender. This is not done automatically when an actor throws an exception while processing a message.



但是考虑到我在 Spray 路由 Actor 和服务 Actor 之间使用了很多接口(interface),我宁愿不使用 try/catch 包装每个子 Actor 的接收部分。有没有更好的方法来实现子actor中异常的自动处理,并在发生异常时立即解决future?

编辑:这是我目前的解决方案。但是,对每个 child Actor 都这样做是相当困惑的。
override def receive: Receive = {
case default =>
try {
default match {
case _ => throw new ServiceException("")//Actual code would go here
}
}
catch {
case se: ServiceException =>
logger.error("Service error raised:", se)
sender ! Failure(se)
case ex: Exception =>
sender ! Failure(ex)
throw ex
}
}

这样,如果它是一个预期的错误(即 ServiceException),它会通过创建一个失败来处理。如果它是意外的,它会立即返回一个失败,以便解决 future ,但随后抛出异常,以便仍然可以由 SupervisorStrategy 处理。

最佳答案

如果您想要一种在发生意外异常时自动将响应发送回发件人的方法,那么这样的事情可能对您有用:

trait FailurePropatingActor extends Actor{
override def preRestart(reason:Throwable, message:Option[Any]){
super.preRestart(reason, message)
sender() ! Status.Failure(reason)
}
}

我们覆盖 preRestart并将故障作为 Status.Failure 传播回发送者这将导致上游 Future失败。此外,调用 super.preRestart 也很重要。在这里,因为那是 child 停下来的地方。在 Actor 中使用它看起来像这样:
case class GetElement(list:List[Int], index:Int)
class MySimpleActor extends FailurePropatingActor {
def receive = {
case GetElement(list, i) =>
val result = list(i)
sender() ! result
}
}

如果我像这样调用这个 Actor 的一个实例:
import akka.pattern.ask
import concurrent.duration._

val system = ActorSystem("test")
import system.dispatcher
implicit val timeout = Timeout(2 seconds)
val ref = system.actorOf(Props[MySimpleActor])
val fut = ref ? GetElement(List(1,2,3), 6)

fut onComplete{
case util.Success(result) =>
println(s"success: $result")

case util.Failure(ex) =>
println(s"FAIL: ${ex.getMessage}")
ex.printStackTrace()
}

然后它会正确地击中我的 Failure堵塞。现在,该基本特征中的代码在 Futures 时运行良好。不参与扩展该特征的 Actor ,就像这里的简单 Actor 。但是如果你使用 Future s 那么你需要小心 Future 中发生的异常不要导致 Actor 重新启动,并且在 preRestart 中,调用 sender()不会返回正确的 ref,因为 actor 已经移动到下一条消息中。像这样的类型转换明了这个问题:
class MyBadFutureUsingActor extends FailurePropatingActor{
import context.dispatcher

def receive = {
case GetElement(list, i) =>
val orig = sender()
val fut = Future{
val result = list(i)
orig ! result
}
}
}

如果我们在之前的测试代码中使用这个actor,我们总是会在失败的情况下得到一个超时。为了缓解这种情况,您需要将 future 的结果通过管道传回发送者,如下所示:
class MyGoodFutureUsingActor extends FailurePropatingActor{
import context.dispatcher
import akka.pattern.pipe

def receive = {
case GetElement(list, i) =>
val fut = Future{
list(i)
}

fut pipeTo sender()
}
}

在这种特殊情况下,actor 本身不会重新启动,因为它没有遇到未捕获的异常。现在,如果您的 actor 需要在 future 进行一些额外的处理,您可以通过管道返回 self 并在收到 Status.Failure 时显式失败。 :
class MyGoodFutureUsingActor extends FailurePropatingActor{
import context.dispatcher
import akka.pattern.pipe

def receive = {
case GetElement(list, i) =>
val fut = Future{
list(i)
}

fut.to(self, sender())

case d:Double =>
sender() ! d * 2

case Status.Failure(ex) =>
throw ex
}
}

如果这种行为变得普遍,您可以将其提供给任何需要它的参与者,如下所示:
trait StatusFailureHandling{ me:Actor =>
def failureHandling:Receive = {
case Status.Failure(ex) =>
throw ex
}
}

class MyGoodFutureUsingActor extends FailurePropatingActor with StatusFailureHandling{
import context.dispatcher
import akka.pattern.pipe

def receive = myReceive orElse failureHandling

def myReceive:Receive = {
case GetElement(list, i) =>
val fut = Future{
list(i)
}

fut.to(self, sender())

case d:Double =>
sender() ! d * 2
}
}

关于scala - 在失败的情况下从询问中解决 Akka future ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29794454/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com