gpt4 book ai didi

scala - Akka Ask 与定时重试

转载 作者:行者123 更新时间:2023-12-01 17:54:39 26 4
gpt4 key购买 nike

这是我编写的一个简单函数,用于通过定时重试执行 Akka“询问”。有一个明显的竞争条件,我不确定如何解决。

def askWithRetry(actor: ActorRef, message: Any, timeout: Timeout): Future[Any] =
(actor ? message)(timeout) recoverWith { case e: AskTimeoutException =>
// do a retry. currently there is no retry limit for simplicity.
askWithRetry(actor, message, timeout)
}

通常情况下,这是有效的。 “ask”或 ? 为每个调用创建一个临时中间角色。如果目标发送响应消息,临时“询问参与者”会将结果作为成功完成放入 Future 中。如果目标没有及时响应,future 将完成并出现超时异常,并且recoverWith 进程将重试。

但是,存在竞争条件。如果目标将响应消息发送给临时“ask actor”,但在响应消息之前处理了超时,则响应消息将丢失。重试过程使用新的临时参与者重新发送新请求。由于响应消息已发送到之前的临时“ask actor”,而该临时“ask actor”现已失效,因此它将不会被处理并丢失。

我该如何解决这个问题?

我可以编写一个自定义版本的 Ask 模式,并内置重试逻辑来修复此竞争条件...如果有更标准的选项,我讨厌使用不必要的自定义代码。

更新:这是我最终使用的自定义版本:

object AskWithRetry {
def askWithRetry(context: ActorContext, actor: ActorRef, message: Any, retryInterval: Duration, maxRetries: Option[Int]): Future[Any] = {
val p = Promise[Any]

val intermediate = context.actorOf(props(p, actor, message, retryInterval, maxRetries))

p.future
}

def props(promise: Promise[Any], target: ActorRef, message: Any, retryInterval: Duration, maxRetries: Option[Int]): Props =
Props(new AskWithRetryIntermediateActor(promise, target, message, retryInterval, maxRetries))
}

class AskWithRetryIntermediateActor(promise: Promise[Any], target: ActorRef, message: Any, retryInterval: Duration, var maxRetries: Option[Int]) extends Actor {
def doSend(): Unit = target ! message

def receive: Receive = {
case ReceiveTimeout =>
maxRetries match {
case None =>
//println(s"Retrying. Infinite tries left. ${message}")
doSend()
case Some(retryCount) =>
if (retryCount > 0) {
//println(s"Retrying. ${retryCount-1} tries left. ${message}")
maxRetries = Some(retryCount - 1)
doSend()
} else {
//println(s"Exceeded timeout limit. Failing. ${message}")
if (!promise.isCompleted) {
promise.failure(new AskTimeoutException("retry limit reached"))
}
context.stop(self)
}
}
case otherMessage: Any =>
if (!promise.isCompleted) {
//println(s"AskWithRetry: completing ${otherMessage}")
promise.success(otherMessage)
}
context.stop(self)
}

context.setReceiveTimeout(retryInterval)
doSend()
}

最佳答案

我认为你的直觉很好。如果您想要自定义参与者逻辑,您应该编写它。

自定义询问等待参与者应将消息发送给参与者,并scheduleOnce自身发送消息以重试。这样,响应和超时都会通过 receive 方法到达,并且您不会发生任何竞争。

关于scala - Akka Ask 与定时重试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22137936/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com