gpt4 book ai didi

multithreading - Scala:加入/等待越来越多的 future 队列

转载 作者:行者123 更新时间:2023-12-03 12:54:40 25 4
gpt4 key购买 nike

我启动了几个异步进程,这些异步进程又可以在需要时启动更多进程(请考虑遍历目录结构或类似的东西)。每个进程都返回一些内容,最后,我想等待所有这些内容完成,并安排一个函数对所得的集合进行处理。

天真的尝试

我的解决方案尝试使用了可变的ListBuffer(我不断添加所产生的 future )和Future.sequence来调度一些功能,以在此缓冲区中列出的所有这些 future 完成时运行。

我准备了一个最小的例子来说明这个问题:

object FuturesTest extends App {
var queue = ListBuffer[Future[Int]]()

val f1 = Future {
Thread.sleep(1000)
val f3 = Future {
Thread.sleep(2000)
Console.println(s"f3: 1+2=3 sec; queue = $queue")
3
}
queue += f3
Console.println(s"f1: 1 sec; queue = $queue")
1
}
val f2 = Future {
Thread.sleep(2000)
Console.println(s"f2: 2 sec; queue = $queue")
2
}

queue += f1
queue += f2
Console.println(s"starting; queue = $queue")

Future.sequence(queue).foreach(
(all) => Console.println(s"Future.sequence finished with $all")
)

Thread.sleep(5000) // simulates app being alive later
}

它先安排 f1f2 future ,然后在1秒后以 f3分辨率安排 f1f3本身将在2秒后解决。因此,我期望得到以下内容:
starting; queue = ListBuffer(Future(<not completed>), Future(<not completed>))
f1: 1 sec; queue = ListBuffer(Future(<not completed>), Future(<not completed>), Future(<not completed>))
f2: 2 sec; queue = ListBuffer(Future(Success(1)), Future(<not completed>), Future(<not completed>))
f3: 1+2=3 sec; queue = ListBuffer(Future(Success(1)), Future(Success(2)), Future(<not completed>))
Future.sequence finished with ListBuffer(1, 2, 3)

但是,我实际上得到了:
starting; queue = ListBuffer(Future(<not completed>), Future(<not completed>))
f1: 1 sec; queue = ListBuffer(Future(<not completed>), Future(<not completed>), Future(<not completed>))
f2: 2 sec; queue = ListBuffer(Future(Success(1)), Future(<not completed>), Future(<not completed>))
Future.sequence finished with ListBuffer(1, 2)
f3: 1+2=3 sec; queue = ListBuffer(Future(Success(1)), Future(Success(2)), Future(<not completed>))

...这很可能是由于我们等待的 future list 在 Future.sequence的初始调用期间是固定的,以后不会更改的事实。

工作,但丑陋的尝试

最终,通过以下代码,我实现了它的作用:
  waitForSequence(queue, (all: ListBuffer[Int]) => Console.println(s"finished with $all"))

def waitForSequence[T](queue: ListBuffer[Future[T]], act: (ListBuffer[T] => Unit)): Unit = {
val seq = Future.sequence(queue)
seq.onComplete {
case Success(res) =>
if (res.size < queue.size) {
Console.println("... still waiting for tasks")
waitForSequence(queue, act)
} else {
act(res)
}
case Failure(exc) =>
throw exc
}
}

这可以按预期工作,最终获得所有3个 future :
starting; queue = ListBuffer(Future(<not completed>), Future(<not completed>))
f1: 1 sec; queue = ListBuffer(Future(<not completed>), Future(<not completed>), Future(<not completed>))
f2: 2 sec; queue = ListBuffer(Future(Success(1)), Future(<not completed>), Future(<not completed>))
... still waiting for tasks
f3: 1+2=3 sec; queue = ListBuffer(Future(Success(1)), Future(Success(2)), Future(<not completed>))
finished with ListBuffer(1, 2, 3)

但这仍然非常丑陋。如果看到队列完成时长于结果数,它只是重新启动 Future.sequence等待,希望下次完成时情况会更好。当然,这很糟糕,因为它会耗尽堆栈,并且如果此检查将在创建Future和将其添加到队列之间的一个很小的窗口中触发,则可能很容易出错。

是否可以这样做而不用Akka重写所有内容,或者不使用 Await.result(由于我的代码是为Scala.js编译的,所以使用 I can't actually use)。

最佳答案

就像贾斯汀提到的那样,您不能丢失对其他 future 中衍​​生的 future 的引用,应该使用map和flatMap对其进行链接。

val f1 = Future {
Thread.sleep(1000)
val f3 = Future {
Thread.sleep(2000)
Console.println(s"f3: 1+2=3 sec")
3
}
f3.map{
r =>
Console.println(s"f1: 1 sec;")
Seq(1, r)
}
}.flatMap(identity)

val f2 = Future {
Thread.sleep(2000)
Console.println(s"f2: 2 sec;")
Seq(2)
}

val futures = Seq(f1, f2)

Future.sequence(futures).foreach(
(all) => Console.println(s"Future.sequence finished with ${all.flatten}")
)

Thread.sleep(5000) // simulates app being alive later

这适用于最小的示例,我不确定它是否适用于您的实际用例。结果是:
f2: 2 sec;
f3: 1+2=3 sec
f1: 1 sec;
Future.sequence finished with List(1, 3, 2)

关于multithreading - Scala:加入/等待越来越多的 future 队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42695985/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com