gpt4 book ai didi

java - RxJava 使用 observables 为 android 构建缓存

转载 作者:太空狗 更新时间:2023-10-29 13:16:12 25 4
gpt4 key购买 nike

我很难理解如何使用 RxJava 构建缓存。我的想法是我需要从内存缓存中获取数据或从我的数据库 (dynamoDb) 加载数据。但是,这个缓存应该在 fragment 和/或线程之间共享。所以我需要返回当前正在运行但未完成的现有可观察对象。这允许线程 catch 进度而不做不必要的工作。我是 RxJava 的新手,所以这是我心中的草图(为简洁起见,缺少一些代码):

public class DBCache<K, T> {
private final ConcurrentHashMap<K, Set<T>> resultCache = new ConcurrentHashMap<>;
private final ConcurrentHashMap<K, Observable<Set<T>>> observableCache = new ConcurrentHashMap<>;

private Observable<Set<T>> getFromCache(final DynamoDbCacheKey<K, T> query) {
return Observable.create(new Observable.OnSubscribe<Set<T>>() {
@Override
public void call(Subscriber<? super Set<T>> subscriber) {
Set<T> results = resultCache.get(query.getKey());
if (results != null && results.size() > 0) {
subscriber.onNext(results);
}
subscriber.onCompleted();
}
});
}

public Observable<Set<T>> get(final QueryCacheKey<K, T> query){
Observable<Set<T>> cachedObservable = observableCache.get(query.getKey());
if (cachedObservable != null) {
return cachedObservable;
}
Observable<Set<T>> observable = Observable
.concat(getFromCache(query), getFromNetwork(query))
.first()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.cache();
observableCache.putIfAbsent(query.getKey(), observable);
return observable;
}

private Observable<Set<T>> getFromNetwork(final QueryCacheKey<K, T> query) {
return Observable.create(new Observable.OnSubscribe<Set<T>>() {
@Override
public void call(Subscriber<? super Set<T>> subscriber) {
try {
Set<T> results = loadFromDb(query); //omitted
resultCache.putIfAbsent(query.getKey(), results);
subscriber.onNext(results);
subscriber.onCompleted();
observableCache.remove(query.getKey());
} catch (Exception exception) {
subscriber.onError(exception);
}
}
});
}

是否有更好的方法通过 RxJava 实现这一点(对缓存策略不感兴趣)。有什么想法吗?

最佳答案

下面是一个简单的缓存和检索值的例子:

public class RxCache<K, V> {

final ConcurrentHashMap<K, AsyncSubject<V>> cache;

final Func1<K, Observable<V>> valueGenerator;

public RxCache(Func1<K, Observable<V>> valueGenerator) {
this.valueGenerator = valueGenerator;
this.cache = new ConcurrentHashMap<>();
}

public Observable<V> get(K key) {
AsyncSubject<V> o = cache.get(key);
if (o != null) {
return o;
}

o = AsyncSubject.create();

AsyncSubject<V> p = cache.putIfAbsent(key, o);
if (p != null) {
return p;
}

valueGenerator.call(key).subscribe(o);

return o;
}

public void remove(K key) {
cache.remove(key);
}
}

如果您有多个值,请将 AsyncSubject 替换为 ReplaySubject

关于java - RxJava 使用 observables 为 android 构建缓存,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34513703/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com