gpt4 book ai didi

multithreading - 嵌套 future 导致更差的性能

转载 作者:行者123 更新时间:2023-12-03 13:14:58 25 4
gpt4 key购买 nike

我有一组任务。除了少数大型任务外,每个此类任务通常都相当小。最初我有一个 Future每个任务。然而,这导致我不得不为少数 Futures 等待相当长的时间。在空闲状态下有大量 CPU 的更大任务。我想通过检查任务是否超过特定大小来改变这一点,如果是,则将任务再次拆分为多个 Futures解决一个子任务。然而,这会导致更糟糕的表现,因为第一组 future 突然按顺序执行。没有任何任务或子任务是相关的,因此可以相互独立地解决。
以下是此行为的概念证明。在 Scala 2.13.4 中测试。

import scala.concurrent.{Await, Future, blocking}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.sys.process._

object Main {

val workSet : List[Int] = List(
3000,
3000,
3000,
12000,
3000,
3000,
3000,
12000,
3000,
3000,
3000,
3000,
3000,
)

def stupidWait(wait : Int) : Int = {
println(s"waiting for $wait")
val start = System.currentTimeMillis()
while ( (start + wait) > System.currentTimeMillis()) {}
1
}

def smartWait(wait : Int) : Int = {
val work = if(wait > 3000) {
List(3000,3000,3000,3000)
} else {
List(3000)
}
println(s"smart working with ${work}")
val future = Future.sequence {
work.map {
wait => Future { stupidWait(wait) }
}
}
Await.ready(future, Inf)
1
}

def time() : String = {
"date".!!
}

def main(args : Array[String]) : Unit = {
println(s"${time()} dumb start")
val futureI = Future.sequence {
workSet.map {
wait => Future { stupidWait(wait) }
}
}
Await.ready(futureI,Inf)
println(s"${time()} dumb end")
println(s"${time()} smart start")
val futureII = Future.sequence {
workSet.map {
wait => Future { smartWait(wait) }
}
}
Await.ready(futureII,Inf)
println(s"${time()} smart end")
}

}

输出是:
Thu Mar  4 08:09:42 PM CET 2021
dumb start
...
Thu Mar 4 08:09:54 PM CET 2021
dumb end
Thu Mar 4 08:09:54 PM CET 2021
smart start
...
Thu Mar 4 08:10:39 PM CET 2021
smart end
我原以为智能等待至少会一样快甚至更快,因为较大的等待时间现在被分成 4 个潜在的并行等待时间。然而事实并非如此。
为什么 smartWait 不快,我必须如何更改代码才能使 smartWait 按预期工作?

最佳答案

执行程序可用的线程数受限于默认情况下您拥有的内核数(并行运行比执行它们的内核数更多的东西通常毫无意义)。
我想,你可能有 8 个内核。因此,前 8 个等待立即开始,其他 5 个在排队。然后 3 秒后 6 个线程完成它们的任务,并拿起剩余的 5 个。因此,再过 3 秒后,除了两个更大的任务之外,其他所有任务都完成了,它们继续旋转 6 秒。
现在,“(显然不是那么)智能等待”会发生什么?
我在输出中添加了自开始和线程名称以来的秒数,以便更轻松地跟踪正在发生的事情:

    0: scala-execution-context-global-16: smart working with List(3000)
0: scala-execution-context-global-18: smart working with List(3000)
0: scala-execution-context-global-21: smart working with List(3000, 3000, 3000, 3000)
0: scala-execution-context-global-19: smart working with List(3000)
0: scala-execution-context-global-17: smart working with List(3000, 3000, 3000, 3000)
0: scala-execution-context-global-14: smart working with List(3000)
0: scala-execution-context-global-20: smart working with List(3000)
0: scala-execution-context-global-15: smart working with List(3000)
0: scala-execution-context-global-22: waiting for 3000
3: scala-execution-context-global-22: waiting for 3000
3: scala-execution-context-global-20: waiting for 3000
6: scala-execution-context-global-22: waiting for 3000
6: scala-execution-context-global-20: waiting for 3000
9: scala-execution-context-global-20: waiting for 3000
9: scala-execution-context-global-22: waiting for 3000
12:

12: scala-execution-context-global-22: waiting for 3000
15: scala-execution-context-global-20: waiting for 3000
15: scala-execution-context-global-17: smart working with List(3000)
15: scala-execution-context-global-21: waiting for 3000
15: scala-execution-context-global-22: waiting for 3000
18: scala-execution-context-global-20: waiting for 3000
18: scala-execution-context-global-19: smart working with List(3000)
18: scala-execution-context-global-16: waiting for 3000
18: scala-execution-context-global-21: waiting for 3000
18: scala-execution-context-global-22: smart working with List(3000)
18: scala-execution-context-global-18: smart working with List(3000)
21: scala-execution-context-global-17: waiting for 3000
21: scala-execution-context-global-20: waiting for 3000
21: scala-execution-context-global-16: smart working with List(3000)
21: scala-execution-context-global-15: waiting for 3000
21: scala-execution-context-global-21: waiting for 3000
看,如何首先启动 8 个“智能”任务。它们都被阻塞了,因为池中没有线程来执行内部 future 。
然后添加线程 #22(第一列中带有 0 的最后一行)。
备注 :在 ForkJoinPool 中有一些非常聪明和复杂的逻辑。用于检测池中的所有线程都被阻塞等待某个条件的实现,并在发生这种情况时启动一个额外的线程(另一个答案中提到的 blocking 用于帮助实现这一点。遵循该建议将使这个测试在大约 3 秒内完成......但是当你需要实际的内核时它是相当无用的 IRL)。它并不总是有效,并且不适用于所有线程池实现。如果您使用了默认执行程序以外的其他执行程序(或以其他方式阻止),则整个过程很可能会在此时被锁定。
因此,它检测到死锁,并启动线程 22 来解决它。这个新线程从队列中挑选一个提交的 future 并运行它三秒钟。这释放了线程 22 和线程 20,因此执行了另外两个任务(在 3 秒标记处)。这还需要 3 秒。看起来运行的任务属于较大的项目之一,所以没有额外的线程被释放,我们仍然只有 20 和 22,其他一切都被阻塞了。所以他们选择另外两个任务运行 3 秒,依此类推。
请注意,在 12 秒标记处,只有 7 个任务实际运行,仅占列表的 1/3 左右。
您可以像这样继续跟踪输出以查看事情的进展情况。
使用 future 的一个相当普遍的经验法则是永远不要阻塞事件线程。它实际上非常危险,并且可能会锁定整个过程(正如我上面所说的,我们在这里没有得到它非常幸运)。
在 future 之外进行拆分,这样您就不必阻塞等待内部 future 返回可能是您遇到的最简单和最安全的解决方案:
    Future.traverse(
workSet.flatMap {
case 3000 => Seq(3000)
case => Seq.fill(4)(3000)
}
) { n => Future(stupidWait(n)) }
或者,为了尽量减少对原始代码的更改,只需制作 smartWait返回 future 而不是等待它,然后在 main去掉外层 Future就做 Future.traverse(workSet)(smartWait) .
这应该在大约 6 秒内完成。

关于multithreading - 嵌套 future 导致更差的性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66481706/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com