gpt4 book ai didi

scala - 在Scala中使用多个Future作业控制主线程

转载 作者:行者123 更新时间:2023-12-02 23:25:57 27 4
gpt4 key购买 nike

def fixture =
new {

val xyz = new XYZ(spark)
}
val fList: scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]] = scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]]() //mutable List of future means List[Future]

test("test case") {
val tasks = for (i <- 1 to 10) {
fList ++ scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]](Future {
println("Executing task " + i )
val ds = read(fixture.etlSparkLayer,i)
ds
})
}

Thread.sleep(1000*4200)
val futureOfList = Future.sequence(fList)//list of Future job in Future sequence
println(Await.ready(futureOfList, Duration.Inf))


val await_result: Seq[Dataset[Row]] = Await.result(futureOfList, Duration.Inf)
println("Squares: " + await_result)

futureOfList.onComplete {
case Success(x) => println("Success!!! " + x)
case Failure(ex) => println("Failed !!! " + ex)
}
}

我正在执行一个具有Future List序列和List具有Future集合的测试用例。我试图通过在Scala中使用Future来执行相同的功能多次并行执行。在我的系统中,只有4个工作在完成4个工作后一次启动接下来的4个工作将像这样开始完成所有工作。那么,如何一次启动4个以上的工作,以及主线程将如何等待完成所有Future线程?我尝试了 Await.result Await.ready 但无法控制主线程,对于主线程控制,我使用Thread.sleep概念。该程序可从RDBMS表读取并在Elasticsearch中写入。那么如何控制主线程的主要问题呢?

最佳答案

假设您使用scala.concurrent.ExecutionContext.Implicits.global ExecutionContext,则可以按以下说明调整线程数:

https://github.com/scala/scala/blob/2.12.x/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L100

特别是以下系统属性:scala.concurrent.context.minThreadsscala.concurrent.context.numThreadsscala.concurrent.context.maxThreadsscala.concurrent.context.maxExtraThreads
否则,您可以将代码重写为如下形式:

import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent._
import java.util.concurrent.Executors

test("test case") {
implicit val ec = ExecutionContext.fromExecutorService(ExecutorService.newFixedThreadPool(NUMBEROFTHREADSYOUWANT))
val aFuture = Future.traverse(1 to 10) {
i => Future {
println("Executing task " + i)
read(fixture.etlSparkLayer,i) // If this is a blocking operation you may want to consider wrapping it in a `blocking {}`-block.
}
}
aFuture.onComplete(_ => ec.shutdownNow()) // Only for this test, and to make sure the pool gets cleaned up
val await_result: immutable.Seq[Dataset[Row]] = Await.result(aFuture, 60.minutes) // Or other timeout
println("Squares: " + await_result)
}

关于scala - 在Scala中使用多个Future作业控制主线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43073008/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com