gpt4 book ai didi

multithreading - Scala 流生产和处理的多线程

转载 作者:行者123 更新时间:2023-12-02 03:30:14 24 4
gpt4 key购买 nike

假设我有一个相当标准的生产者/消费者问题要在 Scala 中编写,具有这种结构:

  1. 构造一个延迟生成元素的 StreamIterator
  2. StreamIterator 上使用mapforeach 来处理这些元素并对它们执行某些操作。

这似乎工作得很好,但它看起来像是单线程的:当我们想要处理一个新元素时,我们要求生成它,生成后,我们就开始处理它。我真正想要的是一种在处理前一个元素时继续生成的机制。有没有办法让 Scala 做到这一点?

我知道我可以使用 BlockingQueue,但这对我来说似乎非常必要。我希望有一种方法可以让 Stream 在另一个线程上继续生成元素。

一旦我们提前生成它们,当然就不再是惰性求值了。但我也不想要提前生成整个流的急切评估。我想要一个 BlockingQueue 的模拟,但在功能范例中。

最佳答案

您可以像这样将流中的项目映射到 future 的处理:

def process(x: Int): Int = // do something time consuming
val asyncProducer = Stream.from(0).map(x => future { process(x)})

现在这不会产生任何结果,因为 Stream 不会生成项目,直到您尝试具体化它们,就像您建议您的流工作一样。因此,如果您想启动对接下来 10 个项目的处理,您可以像这样简单地具体化它们:

val futureResults = asyncProducer.take(10).toList

这将启动 10 个并行进程(取决于范围内的 ExecutionContext)并生成一个 List[Future[Int]]。为了能够接收所有这些工作项,您可以将 future 的列表排序为列表的 future :

val futureResult = Future.sequence(futureResults)

现在,您可以映射这个 future 以获得结果列表并将它们交给某个接收者并开始下一个处理 block 。

关于multithreading - Scala 流生产和处理的多线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27418734/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com