gpt4 book ai didi

scala - 为什么在 Akka Dispatcher 上启动时 Futures 中的 Futures 会按顺序运行

转载 作者:行者123 更新时间:2023-12-04 12:39:15 25 4
gpt4 key购买 nike

当我们尝试从参与者的接收方法中启动多个 future 时,我们观察到了一种奇怪的行为。
如果我们将配置的调度程序用作 ExecutionContext,则 future 将在同一线程上按顺序运行。如果我们使用 ExecutionContext.Implicits.global, future 会按预期并行运行。

我们将代码归结为以下示例(更完整的示例如下):

implicit val ec = context.getDispatcher

Future{ doWork() } // <-- all running parallel
Future{ doWork() }
Future{ doWork() }
Future{ doWork() }

Future {
Future{ doWork() }
Future{ doWork() } // <-- NOT RUNNING PARALLEL!!! WHY!!!
Future{ doWork() }
Future{ doWork() }
}

一个可编译的例子是这样的:
import akka.actor.ActorSystem
import scala.concurrent.{ExecutionContext, Future}

object WhyNotParallelExperiment extends App {

val actorSystem = ActorSystem(s"Experimental")

// Futures not started in future: running in parallel
startFutures(runInFuture = false)(actorSystem.dispatcher)
Thread.sleep(5000)

// Futures started in future: running in sequentially. Why????
startFutures(runInFuture = true)(actorSystem.dispatcher)
Thread.sleep(5000)

actorSystem.terminate()

private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
if (runInFuture) {
Future{
println(s"Start Futures on thread ${Thread.currentThread().getName()}")
(1 to 9).foreach(startFuture)
println(s"Started Futures on thread ${Thread.currentThread().getName()}")
}
} else {
(11 to 19).foreach(startFuture)
}
}

private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
Thread.sleep(500)
println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
}


}

我们尝试了 thread-pool-executor 和 fork-join-executor,结果相同。

我们是否以错误的方式使用 future ?
那么你应该如何产生并行任务?

最佳答案

来自Akka内部的描述 BatchingExecutor (强调我的):

Mixin trait for an Executor which groups multiple nested Runnable.run() calls into a single Runnable passed to the original Executor. This can be a useful optimization because it bypasses the original context's task queue and keeps related (nested) code on a single thread which may improve CPU affinity. However, if tasks passed to the Executor are blocking or expensive, this optimization can prevent work-stealing and make performance worse....A batching executor can create deadlocks if code does not use scala.concurrent.blocking when it should, because tasks created within other tasks will block on the outer task completing.



如果您使用混合在 BatchingExecutor 中的调度程序-- 即 MessageDispatcher 的子类--您可以使用 scala.concurrent.blocking构造以启用嵌套 Futures 的并行性:
Future {
Future {
blocking {
doBlockingWork()
}
}
}

在您的示例中,您将添加 blockingstartFuture方法:
private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future {
blocking {
println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
Thread.sleep(500)
println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
}
}

运行 startFutures(true)(actorSystem.dispatcher) 的示例输出有了上述变化:
Start Futures on thread Experimental-akka.actor.default-dispatcher-2
Started Futures on thread Experimental-akka.actor.default-dispatcher-2
Future 1 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
Future 3 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
Future 5 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
Future 7 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
Future 4 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
Future 9 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
Future 6 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
Future 8 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
Future 2 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
Future 1 finished on thread Experimental-akka.actor.default-dispatcher-2
Future 3 finished on thread Experimental-akka.actor.default-dispatcher-3
Future 5 finished on thread Experimental-akka.actor.default-dispatcher-6
Future 4 finished on thread Experimental-akka.actor.default-dispatcher-5
Future 8 finished on thread Experimental-akka.actor.default-dispatcher-9
Future 7 finished on thread Experimental-akka.actor.default-dispatcher-7
Future 9 finished on thread Experimental-akka.actor.default-dispatcher-10
Future 6 finished on thread Experimental-akka.actor.default-dispatcher-8
Future 2 finished on thread Experimental-akka.actor.default-dispatcher-4

关于scala - 为什么在 Akka Dispatcher 上启动时 Futures 中的 Futures 会按顺序运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49694861/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com