gpt4 book ai didi

scala - 使用有界并行性对 Scala Futures 进行排序(不要搞乱 ExecutorContexts)

转载 作者:行者123 更新时间:2023-12-04 14:12:22 26 4
gpt4 key购买 nike

背景:我有一个功能:

  def doWork(symbol: String): Future[Unit]

它会启动一些副作用来获取数据并存储它,并在完成后完成一个 Future。但是,后端基础设施有使用限制,因此不能并行发出超过 5 个这些请求。我有一个我需要通过的 N 个符号列表:
  var symbols = Array("MSFT",...)

但我想对它们进行排序,以便同时执行不超过 5 个。鉴于:
  val allowableParallelism = 5

我目前的解决方案是(假设我正在使用 async/await):
  val symbolChunks = symbols.toList.grouped(allowableParallelism).toList
def toThunk(x: List[String]) = () => Future.sequence(x.map(doWork))
val symbolThunks = symbolChunks.map(toThunk)
val done = Promise[Unit]()
def procThunks(x: List[() => Future[List[Unit]]]): Unit = x match {
case Nil => done.success()
case x::xs => x().onComplete(_ => procThunks(xs))
}
procThunks(symbolThunks)
await { done.future }

但是,出于显而易见的原因,我对此并不十分满意。我觉得这应该可以通过折叠来实现,但是每次我尝试时,我最终都会热切地创建 Futures。我还使用 concatMap 尝试了一个带有 RxScala Observables 的版本,但这似乎也有点过分了。

有没有更好的方法来实现这一点?

最佳答案

我有示例如何使用 scalaz-stream 做到这一点。这是相当多的代码,因为需要将scala Future转换为scalaz Task(延迟计算的抽象)。但是需要将其添加到项目中一次。另一种选择是使用 Task 来定义“doWork”。我个人更喜欢构建异步程序的任务。

  import scala.concurrent.{Future => SFuture}
import scala.util.Random
import scala.concurrent.ExecutionContext.Implicits.global


import scalaz.stream._
import scalaz.concurrent._

val P = scalaz.stream.Process

val rnd = new Random()

def doWork(symbol: String): SFuture[Unit] = SFuture {
Thread.sleep(rnd.nextInt(1000))
println(s"Symbol: $symbol. Thread: ${Thread.currentThread().getName}")
}

val symbols = Seq("AAPL", "MSFT", "GOOGL", "CVX").
flatMap(s => Seq.fill(5)(s).zipWithIndex.map(t => s"${t._1}${t._2}"))

implicit class Transformer[+T](fut: => SFuture[T]) {
def toTask(implicit ec: scala.concurrent.ExecutionContext): Task[T] = {
import scala.util.{Failure, Success}
import scalaz.syntax.either._
Task.async {
register =>
fut.onComplete {
case Success(v) => register(v.right)
case Failure(ex) => register(ex.left)
}
}
}
}

implicit class ConcurrentProcess[O](val process: Process[Task, O]) {
def concurrently[O2](concurrencyLevel: Int)(f: Channel[Task, O, O2]): Process[Task, O2] = {
val actions =
process.
zipWith(f)((data, f) => f(data))

val nestedActions =
actions.map(P.eval)

merge.mergeN(concurrencyLevel)(nestedActions)
}
}

val workChannel = io.channel((s: String) => doWork(s).toTask)

val process = Process.emitAll(symbols).concurrently(5)(workChannel)

process.run.run

当您在范围内进行所有这些转换时,基本上您只需要:
  val workChannel = io.channel((s: String) => doWork(s).toTask)

val process = Process.emitAll(symbols).concurrently(5)(workChannel)

非常简短且自我描述

关于scala - 使用有界并行性对 Scala Futures 进行排序(不要搞乱 ExecutorContexts),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27085085/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com