gpt4 book ai didi

reactive-programming - 如何使用/控制 RxJava Observable.cache

转载 作者:行者123 更新时间:2023-12-04 02:34:14 25 4
gpt4 key购买 nike

我正在尝试使用 RxJava 缓存机制( RxJava2 ),但我似乎无法理解它是如何工作的,或者我如何控制缓存的内容,因为有 cache运算符(operator)。
我想在发出新数据之前用一些条件验证缓存的数据。
例如

someObservable.
repeat().
filter { it.age < maxAge }.
map(it.name).
cache()

我如何检查和过滤缓存值并在它成功时发出它,否则我将请求一个新值。
由于该值会定期更改,因此我需要先验证缓存是否仍然有效,然后才能请求新缓存。
还有 ObservableCache<T>类,但我找不到任何使用它的资源。
任何帮助将非常感激。谢谢。

最佳答案

这不是重放/缓存的工作方式。请先阅读 #replay/#cache 文档。
重播
这个操作符返回一个 ConnectableObservable,它有一些方法 (#refCount/#connect/#autoConnect) 用于连接到源。
当#replay 在没有过载的情况下应用时,源订阅被多播,并且所有发出的值都将被重播。源订阅是惰性的,可以通过#refCount/#connect/#autoConnect 连接到源。

Returns a ConnectableObservable that shares a single subscription to the underlying ObservableSource that will replay all of its items and notifications to any future Observer.


在没有任何连接方法的情况下应用 #relay (#refCount/#connect/#autoConnect) 不会在订阅时发出任何值

A Connectable ObservableSource resembles an ordinary ObservableSource, except that it does not begin emitting items when it is subscribed to, but only when its connect method is called.


重播(1)#autoConnect(-1)/#refCount(1)/#connect
应用 replay(1) 将缓存最后一个值,并将在每个订阅上发出缓存的值。 #autoConnect 将立即连接打开连接并保持打开状态,直到发生终端事件(onComplete、onError)。 #refCount 是模拟的,但是当所有订阅者都消失时,它会与源断开连接。 #connect 操作符可以用于,当你需要等待时,当对可观察对象完成所有订阅时,为了不丢失值。
用法
#replay(1)——大部分应该在可观察的末尾使用。
sourcObs.
.filter()
.map()
.replay(bufferSize)
.refCount(connectWhenXSubsciberSubscribed)
警告
应用没有缓冲区限制或到期日期的#replay 会导致内存泄漏,当您观察到无限时
缓存/cacheWithInitialCapacity
运算符类似于带有 autoConnect(1) 的 #replay。运算符(operator)将缓存每个值并在每个订阅上重放。

The operator subscribes only when the first downstream subscriber subscribes and maintains a single subscription towards this ObservableSource. In contrast, the operator family of replay() that return a ConnectableObservable require an explicit call to ConnectableObservable.connect().Note: You sacrifice the ability to dispose the origin when you use the cache Observer so be careful not to use this Observer on ObservableSources that emit an infinite or very large number of items that will use up memory. A possible workaround is to apply takeUntil with a predicate or another source before (and perhaps after) the application of cache().


例子
    @Test
fun skfdsfkds() {
val create = PublishSubject.create<Int>()

val cacheWithInitialCapacity = create
.cacheWithInitialCapacity(1)

cacheWithInitialCapacity.subscribe()

create.onNext(1)
create.onNext(2)
create.onNext(3)

cacheWithInitialCapacity.test().assertValues(1, 2, 3)
cacheWithInitialCapacity.test().assertValues(1, 2, 3)
}
用法
使用缓存操作符,当你无法控制连接阶段时

This is useful when you want an ObservableSource to cache responses and you can't control the subscribe/dispose behavior of all the Observers.


警告
与 replay() 一样,缓存是无界的,可能会导致内存泄漏。

Note: The capacity hint is not an upper bound on cache size. For that, consider replay(int) in combination with ConnectableObservable.autoConnect() or similar.


进一步阅读
https://blog.danlew.net/2018/09/25/connectable-observables-so-hot-right-now/
https://blog.danlew.net/2016/06/13/multicasting-in-rxjava/

关于reactive-programming - 如何使用/控制 RxJava Observable.cache,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62607426/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com