gpt4 book ai didi

scala - 在 Scala 中最多只能做 4 个并发 future

转载 作者:行者123 更新时间:2023-12-04 01:56:21 26 4
gpt4 key购买 nike

我认为使用 future 可以很容易地让我触发一次代码块,但似乎我一次只能有 4 个 future 。

这个限制是从哪里来的,或者我是在滥用 Futures 这样使用它吗?

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import java.util.Calendar

object Main extends App{

val rand = scala.util.Random

for (x <- 1 to 100) {
val f = Future {
//val sleepTime = rand.nextInt(1000)
val sleepTime = 2000
Thread.sleep(sleepTime)

val today = Calendar.getInstance().getTime()
println("Future: " + x + " - sleep was: " + sleepTime + " - " + today)
1;
}
}

Thread.sleep(10000)
}

输出:
Future: 3 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 2 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 4 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 1 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 7 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 5 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 6 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 8 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 9 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 11 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 10 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 12 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 16 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
Future: 13 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
Future: 15 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
Future: 14 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015

我希望他们都同时出现。

为了给出一些上下文,我想我可以使用这个结构并通过一个主循环来扩展它,在这个主循环中,它根据从指数分布中提取的值休眠每个循环,以模拟用户到达/执行查询。每次 sleep 后,我想通过将查询发送到程序的驱动程序(在本例中为 Spark,并且驱动程序允许多个线程使用它)来执行查询。还有比使用 Futures 更明显的方法吗?

最佳答案

当您使用 import ExecutionContext.Implicits.global 时,
它创建具有与 CPU 数量相同大小的线程池。

ExecutionContext.scala 的来源

The default ExecutionContext implementation is backed by a work-stealing thread pool. By default, the thread pool uses a target number of worker threads equal to the number of [[https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#availableProcessors-- available processors]].



还有一个很好的 StackOverflow 问题: What is the behavior of scala.concurrent.ExecutionContext.Implicits.global?

由于线程池的默认大小取决于 CPU 的数量,如果要使用更大的线程池,则必须编写类似
import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors
implicit val ec = ExecutionContext.fromExecutorService(Executors.newWorkStealingPool(8))

在执行 Future 之前。

(在您的代码中,您必须将它放在 for 循环之前。)

请注意,工作窃取池是在 java 8 中添加的,scala 有自己的 ForkJoinPool 来完成工作窃取: scala.concurrent.forkjoin.ForkJoinPool vs java.util.concurrent.ForkJoinPool

此外,如果您希望每个 Future 一个线程,您可以编写类似
implicit val ec = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor)

因此,以下代码并行执行 100 个线程
import scala.concurrent._
import java.util.concurrent.Executors

object Main extends App{
for (x <- 1 to 100) {
implicit val ec = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor)
val f = Future {
val sleepTime = 2000
Thread.sleep(sleepTime)

val today = Calendar.getInstance().getTime()
println("Future: " + x + " - sleep was: " + sleepTime + " - " + today)
1;
}
}

Thread.sleep(10000)
}

除了工作窃取线程池和单线程执行器,还有一些其他的执行器: http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html

详细阅读文档:
http://docs.scala-lang.org/overviews/core/futures.html

关于scala - 在 Scala 中最多只能做 4 个并发 future ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32306671/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com