gpt4 book ai didi

scala - 等待所有的Future.onComplete回调执行完毕

转载 作者:行者123 更新时间:2023-12-04 05:24:14 24 4
gpt4 key购买 nike

我正在使用Scala 2.10.X中的Future API。

这是我的用例:

object Class1 {

def apply(f: (Int) => Future[String])(i: Int): Future[String] = {

val start = DateTime.now

val result = f(i)

result.onComplete{
case _ => println("Started at " + start + ", ended at " + DateTime.now)
}

result
}
}

我认为很简单:我正在为我的 future 添加一个onComplete回调。现在,我想知道是否有一种方法可以在onComplete完成执行时添加回调-在此示例中,知道何时完成日志记录。

假设我的 result实例注册了3个 onComplete,我可以知道它们何时全部执行了吗?我认为这是不可能的,但谁知道:)

也许替代方法是调用 map而不是 onComplete来返回 Future的新实例:
def apply(f: (Int) => Future[String])(i: Int): Future[String] = {

val start = DateTime.now

f(i) map {
case r =>
println("Started at " + start + ", ended at " + DateTime.now)
r
}
}

但是我不确定它是否会保持相同的行为。

编辑:只是为了澄清-只有 Future的一个 实例,而我在同一实例上调用了onComplete 3次(嗯,在我的示例中,只调用了一次,但是我说它被调用了N次),并且我想知道由于相同的Future实例的完成而完成3个回调的执行时间。

最佳答案

如果您不想使用其他方法(例如CountDownLatch),则希望使用andThen知道您的操作何时完成(是否成功以及Future是否成功)。

scala> val f = Future(3)
f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@4b49ca35

scala> val g = f andThen { case Success(i) => println(i) } andThen { case _ => println("All done") }
3
g: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@1939e13
All done

如果future失败,则不会调用映射的函数:
scala> val f = Future[Int](???)
f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@7001619b

scala> val g = f andThen { case t => println(s"stage 1 $t") } andThen { case _ => println("All done") }
stage 1 Failure(java.util.concurrent.ExecutionException: Boxed Error)
All done
g: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@24e1e7e8

scala> val g = f map { case i => println(i) } andThen { case _ => println("All done") }
All done
g: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@5d0f75d6

scala> val g = f map { case i => println(i) } map { case _ => println("All done") }
g: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@5aabe81f

scala> g.value
res1: Option[scala.util.Try[Unit]] = Some(Failure(java.util.concurrent.ExecutionException: Boxed Error))

同样,在链式处理程序中爆炸不会中断后续操作:
scala> val g = f andThen { case t => null.hashCode } andThen { case _ => Thread.sleep(1000L); println("All done") }
java.lang.NullPointerException
at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.applyOrElse(<console>:51)
at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.applyOrElse(<console>:51)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:431)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:430)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
g: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@3fb7bec8

scala> All done


scala> g.value
res1: Option[scala.util.Try[Int]] = Some(Success(3))

对于需要等待的不幸情况:
scala> val f = Future[Int](???)
f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@859a977

scala> import java.util.concurrent.{ CountDownLatch => CDL }
import java.util.concurrent.{CountDownLatch=>CDL}

scala> val latch = new CDL(3)
latch: java.util.concurrent.CountDownLatch = java.util.concurrent.CountDownLatch@11683e9f[Count = 3]

scala> f onComplete { _ => println(1); latch.countDown() }
1

scala> f onComplete { _ => println(2); latch.countDown() }
2

scala> f onComplete { _ => println(3); latch.countDown() }
3

scala> f onComplete { _ => latch.await(); println("All done") }
All done

关于scala - 等待所有的Future.onComplete回调执行完毕,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21188012/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com