gpt4 book ai didi

scala - "slow"Future.traverse 版本中是否有内部版本?

转载 作者:行者123 更新时间:2023-12-04 01:54:12 26 4
gpt4 key购买 nike

我发现为一个用户请求构建很多 Futures 通常是一种不好的做法。这些 Future 可以填充一个执行上下文,这将影响其他请求。这不太可能是您真正想要的。保持 Futures 数量很小很简单 - 仅在 for-comprehensions 中创建新的 Futures,使用 flatMap 等。但有时可能需要为每个 Seq 项创建一个 Future。使用 Future.sequence 或 Future.traverse 会导致上述问题。所以我最终得到了这个解决方案,它不会同时为每个集合项创建 Futures:

  def ftraverse[A, B](xs: Seq[A])(f: A => Future[B])(implicit ec: ExecutionContext): Future[Seq[B]] = {
if(xs.isEmpty) Future successful Seq.empty[B]
else f(xs.head) flatMap { fh => ftraverse(xs.tail)(f) map (r => fh +: r) }
}

我想知道,也许我正在发明一个轮子,实际上这样的功能已经存在于 Scala 的标准库中?另外我想知道,你遇到过描述的问题吗,你是怎么解决的?也许,如果这是 Futures 的一个众所周知的问题,我应该在 Future.scala 中创建一个拉取请求,以便此函数(或更通用的版本)将包含在标准库中?

UPD:更通用的版本,并行性有限:
  def ftraverse[A, B](xs: Seq[A], chunkSize: Int, maxChunks: Int)(f: A => Future[B])(implicit ec: ExecutionContext): Future[Seq[B]] = {
val xss = xs.grouped(chunkSize).toList
val chunks = xss.take(maxChunks-1) :+ xss.drop(maxChunks-1).flatten
Future.sequence{ chunks.map(chunk => ftraverse(chunk)(f) ) } map { _.flatten }
}

最佳答案

不,标准库中没有这样的东西。应不应该有,我说不准。我不认为想要执行 Future 很常见s 严格的顺序。但是,当您愿意时,很容易实现您自己的方法,就像您所拥有的那样。为此,我个人只是在我自己的库中保留了一个方法。但是,使用标准库来实现这一点会很方便。如果有,它应该更通用。

修改当前traverse其实很简单处理 Future s 顺序,而不是并行。这是current version , 使用 foldLeft而不是递归:

def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.foldLeft(Future.successful(cbf(in))) { (fr, a) =>
val fb = fn(a)
for (r <- fr; b <- fb) yield (r += b)
}.map(_.result())
Future s 在 flatMap 之前创建通过分配 val fb = fn(a) (因此之前执行)。只需搬家 fn(a)flatMap延迟创建后续 Future s 在集合中。
def traverseSeq[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.foldLeft(Future.successful(cbf(in))) { (fr, a) =>
for (r <- fr; b <- fn(a)) yield (r += b)
}.map(_.result())

另一种可以限制执行大量 Future 的影响的方法s 是通过使用不同的 ExecutionContext为他们。例如,在 Web 应用程序中,我可能会保留一个 ExecutionContext用于数据库调用,一种用于调用 Amazon S3,一种用于慢速数据库调用。

一个非常简单的实现可以使用固定线程池:
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
val executorService = Executors.newFixedThreadPool(4)
val executionContext = ExecutionContext.fromExecutorService(executorService)

大量 Future s 在此处执行将填充 ExecutionContext ,但这会阻止它们填充其他上下文。

如果您使用 Akka,您可以轻松创建 ExecutionContext s 从配置使用 DispatchersActorSystem :
my-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 10
}
throughput = 100
}

如果您有 ActorSystemsystem然后您可以通过以下方式访问它:
implicit val executionContext = system.dispatchers.lookup("my-dispatcher")

所有这些都取决于您的用例。虽然我确实将异步计算分离到不同的上下文中,但有时我仍然想要 traverse依次平滑这些上下文的使用。

关于scala - "slow"Future.traverse 版本中是否有内部版本?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28514621/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com