gpt4 book ai didi

scala - 可从 Futures 观察 - 来自多个线程的 onNext

转载 作者:行者123 更新时间:2023-12-02 04:50:59 25 4
gpt4 key购买 nike

我想根据 Futures 列表的结果实时生成一个 Observable

在最简单的情况下,假设我有一个正在使用 Future.sequence 运行的 futures 列表,并且我只是使用一个 Observable 来监控它们的进度每次完成时都会告诉我。我基本上是这样做的:

  def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = {
Observable[String](observer => {
val loudFutures: List[Future[Int]] = futs.map(f => {
f onComplete {
case Success(a) => observer.onNext(s"just did $a more")
case Failure(e) => observer.onError(e)
}
f
})
Future.sequence(loudFutures) onComplete {
case Success(_) => observer.onCompleted()
case Failure(e) => observer.onError(e)
}
})
}

这在我的测试环境中运行良好。但我刚刚读到 onNext 不应该从不同的线程调用,至少不小心没有重叠的调用。解决这个问题的推荐方法是什么?似乎许多现实世界中 Observables 的使用都需要从这样的异步代码中调用 onNext ,但我在文档中找不到类似的示例。

最佳答案

The Observable Contract

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

看看Serialize

It is possible for an Observable to invoke its observers’ methods asynchronously, perhaps from different threads. This could make such an Observable violate the Observable contract, in that it might try to send an OnCompleted or OnError notification before one of its OnNext notifications, or it might make an OnNext notification from two different threads concurrently. You can force such an Observable to be well-behaved and synchronous by applying the Serialize operator to it.

关于scala - 可从 Futures 观察 - 来自多个线程的 onNext,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35959727/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com