gpt4 book ai didi

java - RxJava 中的任务取消是如何工作的?

转载 作者:搜寻专家 更新时间:2023-10-30 19:42:30 24 4
gpt4 key购买 nike

我不清楚如何在 RXJava 中实现任务取消。

我对移植使用 Guava 的 ListenableFuture 构建的现有 API 很感兴趣。我的用例如下:

  • 我有一个单一操作,由 Futures.transform()
  • 连接的一系列 future 组成
  • 多个订阅者观察操作的最终 future 。
  • 每个观察者都可以取消最终的 future ,所有观察者都见证取消事件。
  • 取消最终的 future 会导致取消其依赖项,例如在序列 1->2->3 中,取消 3 传播到 2,等等。

RxJava wiki 中关于此的信息非常少;我能找到的关于取消的唯一引用提到 Subscription 等同于 .NET 的 Disposable,但据我所知,Subscription 只提供取消订阅后续值的能力在序列中。

我不清楚如何通过此 API 实现“任何订阅者都可以取消”语义。我是不是以错误的方式思考这个问题?

如有任何意见,我们将不胜感激。

最佳答案

了解Cold vs Hot Observables很重要.如果你的 Observables 是冷的,那么如果你没有订阅者,它们的操作将不会执行。因此,要“取消”,只需确保所有观察者都取消订阅源 Observable。

但是,如果源中只有一个 Observer 取消订阅,而其他 Observer 仍然订阅了源,则不会导致“取消”。在那种情况下,您可以使用(但它不是唯一的解决方案)ConnectableObservables .另见 this link about Rx.NET .

使用 ConnectableObservables 的一种实用方法是在任何冷 Observable 上简单地调用 .publish().refCount()。这样做是创建一个单一的“代理”观察者,它将事件从源传递到实际的观察者。当最后一个实际观察者取消订阅时,代理观察者取消订阅。

要手动控制 ConnectableObservable,只需调用 coldSource.publish() 即可获得 ConnectableObservable 的实例。然后你可以调用 .connect() ,它会返回你“代理”观察者的订阅。要手动“取消”源,您只需取消订阅代理观察者的订阅即可。


对于您的特定问题,您还可以使用 .takeUntil() 运算符。

假设你的“最终 future ”在RxJava中被移植为finalStream,假设“取消事件”是Observables cancelStream1, cancelStream2,等,然后“取消”由 finalStream 产生的操作变得相当简单:

Observable<FooBar> finalAndCancelableStream = finalStream
.takeUntil( Observable.merge(cancelStream1, cancelStream2) );

在图表中,this is how takeUntil works , 和 this is how merge works .

用简单的英语,您可以将其理解为“finalAndCancelableStream 是 finalStream,直到 cancelStream1 或 cancelStream2 发出事件”。

关于java - RxJava 中的任务取消是如何工作的?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25344088/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com