gpt4 book ai didi

scala - 如何将 future 的结果高效组合为 future

转载 作者:行者123 更新时间:2023-12-05 05:11:01 25 4
gpt4 key购买 nike

我有很多计算贡献一个最终结果,贡献的顺序没有限制。似乎 Futures 应该能够加快速度,他们确实这样做了,但不是我想象的那样。这是比较一种非常愚蠢的整数除法性能的代码:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

object scale_me_up {
def main(args: Array[String]) {
val M = 500 * 1000
val N = 5
Thread.sleep(3210) // let launcher settle down
for (it <- 0 until 15) {
val method = it % 3
val start = System.currentTimeMillis()
val result = divide(M, N, method)
val elapsed = System.currentTimeMillis() - start
assert(result == M / N)
if (it >= 6) {
val methods = Array("ordinary", "fast parallel", "nice parallel")
val name = methods(method)
println(f"$name%15s: $elapsed ms")
}
}
}

def is_multiple_of(m: Int, n: Int): Boolean = {
val result = !(1 until n).map(_ + (m / n) * n).toSet.contains(m)
assert(result == (m % n == 0)) // yes, a less crazy implementation exists
result
}

def divide(m: Int, n: Int, method: Int): Int = {
method match {
case 0 =>
(1 to m).count(is_multiple_of(_, n))
case 1 =>
(1 to m)
.map { x =>
Future { is_multiple_of(x, n) }
}
.count(Await.result(_, Duration.Inf))
case 2 =>
Await.result(divide_futuristically(m, n), Duration.Inf)
}
}

def divide_futuristically(m: Int, n: Int): Future[Int] = {
val futures = (1 to m).map { x =>
Future { is_multiple_of(x, n) }
}
Future.foldLeft(futures)(0) { (count, flag) =>
{ if (flag) { count + 1 } else { count } }
}
/* much worse performing alternative:
Future.sequence(futures).map(_.count(identity))
*/
}
}

当我运行它时,并行 case 1 比普通的 case 0 代码快一些(欢呼),但是 case 2 需要两倍长。当然,这取决于系统以及每个 future 是否需要完成足够的工作(此处随分母 N 增长)以抵消并发开销。 [PS] 正如预期的那样,减少 N 使 case 0 领先,增加 N 足以使 case 1case 2 快两倍case 0 在我的双核 CPU 上。

我相信 divide_futuristically 是表达这种计算的更好方式:返回带有组合结果的 future 。阻塞正是​​我们在这里衡量性能所需要的。但实际上,堵得越多,大家吃完的速度也就越快。我究竟做错了什么?总结 future 的几种选择(如 sequence )都会受到相同的惩罚。

[PPS] 这是在 2 核 CPU 上运行在 Java 11 上的 Scala 2.12。对于 6 核 CPU 上的 Java 12,差异要小得多(尽管使用 sequence 的替代方案仍然拖后腿)。使用 Scala 2.13,差异甚至更小,随着每次迭代的工作量增加,divide_futuristically 开始超越竞争对手。 future 终于来了...

最佳答案

看来你做的一切都是对的。我自己尝试了不同的方法,甚至 .par 但得到了相同或更差的结果。

我深入研究了 Future.foldLeft 并试图分析导致延迟的原因:

  /** A non-blocking, asynchronous left fold over the specified futures,
* with the start value of the given zero.
* The fold is performed asynchronously in left-to-right order as the futures become completed.
* The result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold.
*
* Example:
* {{{
* val futureSum = Future.foldLeft(futures)(0)(_ + _)
* }}}
*
* @tparam T the type of the value of the input Futures
* @tparam R the type of the value of the returned `Future`
* @param futures the `scala.collection.immutable.Iterable` of Futures to be folded
* @param zero the start value of the fold
* @param op the fold operation to be applied to the zero and futures
* @return the `Future` holding the result of the fold
*/
def foldLeft[T, R](futures: scala.collection.immutable.Iterable[Future[T]])(zero: R)(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] =
foldNext(futures.iterator, zero, op)

private[this] def foldNext[T, R](i: Iterator[Future[T]], prevValue: R, op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] =
if (!i.hasNext) successful(prevValue)
else i.next().flatMap { value => foldNext(i, op(prevValue, value), op) }

这部分:

else i.next().flatMap { value => foldNext(i, op(prevValue, value), op) }

.flatMap 生成一个新的 Future 提交给 executor。换句话说,每一个

    { (count, flag) =>
{ if (flag) { count + 1 } else { count } }
}

作为新的 Future 执行。

我想这部分会导致实验证明的延迟。

关于scala - 如何将 future 的结果高效组合为 future ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55992716/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com