gpt4 book ai didi

scala - 如何在Scala中对Future进行投票?

转载 作者:行者123 更新时间:2023-12-04 06:14:23 26 4
gpt4 key购买 nike

我想轮询一个API端点,直到达到某种条件。我希望它能在几秒钟到一分钟内达到此条件。我有一种方法可以调用返回Future的端点。有什么方法可以将Future链接在一起,以便每n毫秒轮询一次此终结点,并在t尝试后放弃?

假设我有一个带有以下签名的函数:

def isComplete(): Future[Boolean] = ???

我认为最简单的方法是使所有内容都阻塞:
def untilComplete(): Unit = {
for { _ <- 0 to 10 } {
val status = Await.result(isComplete(), 1.seconds)
if (status) return Unit
Thread.sleep(100)
}
throw new Error("Max attempts")
}

但这可能会占用所有线程,并且不是异步的。我还考虑了递归执行:
def untilComplete(
f: Future[Boolean] = Future.successful(false),
attempts: Int = 10
): Future[Unit] = f flatMap { status =>
if (status) Future.successful(Unit)
else if (attempts == 0) throw new Error("Max attempts")
else {
Thread.sleep(100)
untilComplete(isComplete(), attempts - 1)
}
}

但是,我担心最大化调用堆栈,因为这不是尾递归。

有更好的方法吗?

编辑:我正在使用akka

最佳答案

您可以使用Akka Streams。例如,每500毫秒调用isComplete直到Future的结果为真,最多调用五次:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import scala.concurrent.Future
import scala.concurrent.duration._

def isComplete(): Future[Boolean] = ???

implicit val system = ActorSystem("MyExample")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val stream: Future[Option[Boolean]] =
Source(1 to 5)
.throttle(1, 500 millis)
.mapAsync(parallelism = 1)(_ => isComplete())
.takeWhile(_ == false, true)
.runWith(Sink.lastOption)

stream onComplete { result =>
println(s"Stream completed with result: $result")
system.terminate()
}

关于scala - 如何在Scala中对Future进行投票?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55637835/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com