gpt4 book ai didi

scala - 如何使用 Scala Future 的批处理函数?

转载 作者:行者123 更新时间:2023-12-04 05:51:17 25 4
gpt4 key购买 nike

我有一个函数,我想调用 100 次,但我想以批处理方式执行,以便一次只运行 2 个函数。这是因为该函数可能会给 Internet 连接带来高负载,因此最好以 2 为一组对函数进行批处理。

这是我用 Scala Futures 做的尝试,但似乎不起作用。是否有使用 Scala Futures 批量处理任务列表的标准方法?

  def futureString(s:String): String = {
Thread.sleep(2000)// + (Math.random()*1000).toInt)
println(s"Completed $s")
"end:" + s
}

def processList(list: List[String], blockSize: Int) = {
var futuresProcessing = Set[Future[String]]()
async {
val itemIterator = list.iterator
while (itemIterator.hasNext) {
val item = itemIterator.next()
println("Item is " + item)

if (futuresProcessing.size >= blockSize) {
await {
val completed = Future.firstCompletedOf(futuresProcessing.toSeq)
println("Size : " + futuresProcessing.size)
completed
}
}

val f = future { futureString(item) }
f.onComplete{ case Success(sss) => { futuresProcessing = futuresProcessing - f } }
futuresProcessing = futuresProcessing + f
}
}
}

val list: List[String] = (1 to 200).map(n => "" + n).toList
processList(list, 2)

我想要的是我可以批量处理任何批量大小,并且 futureString 可以在随机的时间完成。因此,假设批次大小为 10,然后开始 10 个项目,当一个项目完成时,应将新项目添加到批次中进行处理。

我开始认为我应该使用 Actor 。

更新: 经过长时间的 sleep 和清醒后,我开始工作了,但我认为使用 Actors 会更好。此外,我认为以下代码和 futuresProcessing Set 的使用存在一些竞争条件问题。
  import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
import scala.async.Async.{async, await}
import scala.collection.parallel.mutable
import scala.util.{Success, Try}
import scala.concurrent.Await

def futureString(s:String): Future[String] = {
future {
Thread.sleep(2000 + (Math.random()*1000).toInt)
println(s"Completed $s")
"end:" + s
}
}

def processList(list: List[String], blockSize: Int) = {
val futuresProcessing = mutable.ParSet[Future[String]]()
async {
val itemIterator = list.iterator
while (itemIterator.hasNext) {
val item = itemIterator.next()
println("Item is " + item)

if (futuresProcessing.size >= blockSize) {
await {
val completed = Future.firstCompletedOf(futuresProcessing.toList)
println("Size : " + futuresProcessing.size)
completed
}
}

val f = futureString(item)
futuresProcessing += f
f.onComplete{ case Success(sss) => { futuresProcessing -= f } }
}
}
}

val list: List[String] = (1 to 200).map(n => "" + n).toList
processList(list, 4)

最佳答案

如果您只关心并行处理 2 个批次,那么可能有一个更简单的解决方案:

val data = (1 to 20).map(_.toString()).grouped(2).toList

然后:
val result = data.flatMap(pair => pair.par.map(futureString))

其中产生:
// pause
Completed 1
Completed 2
// pause
Completed 4
Completed 3
// pause
Completed 6
Completed 5
// pause
Completed 8
Completed 7
// pause
Completed 9
Completed 10
// pause
// ..etc

result: List[String] = List(end:1, end:2, end:3, end:4, end:5, end:6, end:7, end
:8, end:9, end:10, end:11, end:12, end:13, end:14, end:15, end:16, end:17, end:1
8, end:19, end:20)

如果你想让它异步完成(因为上面的版本会阻塞),你可以将整个结果处理包装在一个 Future 中并等待它。

关于scala - 如何使用 Scala Future 的批处理函数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20595864/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com