gpt4 book ai didi

multithreading - Scala Future/Promise 快速失败管道

转载 作者:行者123 更新时间:2023-12-04 08:27:23 25 4
gpt4 key购买 nike

我想并行启动两个或多个 Future/Promise 并且即使启动的 Future/Promise 之一失败也失败并且不想等待其余的完成。
在 Scala 中组合这个管道的最惯用的方法是什么?

编辑:更多上下文信息。

我必须启动两个外部进程,一个写入 fifo 文件,另一个从中读取。假设 writer 进程失败;读取器线程可能会永远挂起,等待来自文件的任何输入。所以我想同时启动这两个进程和 快速失败 即使 Future/Promise 之一在没有等待另一个完成的情况下失败。

下面是更精确的示例代码。命令不完全是 cattail .为简洁起见,我使用了它们。

val future1 = Future { executeShellCommand("cat file.txt > fifo.pipe") }
val future2 = Future { executeShellCommand("tail fifo.pipe") }

最佳答案

如果我正确理解了这个问题,我们正在寻找的是一个快速失败的序列实现,它类似于 firstCompletedOf 的失败偏向版本。

在这里,我们急切地注册一个失败回调,以防其中一个 future 提前失败,确保我们在任何 future 失败时立即失败。

import scala.concurrent.{Future, Promise}
import scala.util.{Success, Failure}
import scala.concurrent.ExecutionContext.Implicits.global
def failFast[T](futures: Seq[Future[T]]): Future[Seq[T]] = {
val promise = Promise[Seq[T]]
futures.foreach{f => f.onFailure{case ex => promise.failure(ex)}}
val res = Future.sequence(futures)
promise.completeWith(res).future
}

对比 Future.sequence ,无论顺序如何,只要任何 future 失败,此实现就会失败。
让我们用一个例子来说明:
import scala.util.Try
// help method to measure time
def resilientTime[T](t: =>T):(Try[T], Long) = {
val t0 = System.currentTimeMillis
val res = Try(t)
(res, System.currentTimeMillis-t0)
}

import scala.concurrent.duration._
import scala.concurrent.Await

第一个 future 将失败(2秒内失败)
val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")}
val f2 = Future[Int]{Thread.sleep(5000); 42}
val f3 = Future[Int]{Thread.sleep(10000); 101}
val res = failFast(Seq(f1,f2,f3))

resilientTime(Await.result(res, 10.seconds))
// res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),1998)

最后的 future 将失败。故障也在2秒内。 (注意序列构造中的顺序)
val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")}
val f2 = Future[Int]{Thread.sleep(5000); 42}
val f3 = Future[Int]{Thread.sleep(10000); 101}
val res = failFast(Seq(f3,f2,f1))

resilientTime(Await.result(res, 10.seconds))
// res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),1998)

对比 Future.sequence其中失败取决于排序(10 秒后失败):
val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")}
val f2 = Future[Int]{Thread.sleep(5000); 42}
val f3 = Future[Int]{Thread.sleep(10000); 101}
val seq = Seq(f3,f2,f1)

resilientTime(Await.result(Future.sequence(seq), 10.seconds))
//res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),10000)

关于multithreading - Scala Future/Promise 快速失败管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39437457/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com