gpt4 book ai didi

java - 来自缓存的 Flowable 和使用 RxJava 的其他 Flowable for DataSource

转载 作者:行者123 更新时间:2023-12-02 05:10:49 26 4
gpt4 key购买 nike

我对 RxJava 还很陌生,我需要创建包含多个数据源的存储库。这对我来说很复杂,因为有几个较小的子任务我不知道如何用 RxJava 实现。

我有 Dao,它提供 Flowable<Item>在某些范围内到 DataSource 类。该数据源具有本地缓存​​,可以随时失效。当存储库向数据源请求某个范围(可能超出数据源边界,在完全缓存之前边界是未知的)时,它必须产生错误(或以其他方式通知存储库)。

我想创建Flowable<Item> DataSource 的方法将从缓存中发出项目,如果需要,将它们与 Flowable<Item> dao.getRange(...) 连接起来,同时缓存来自 dao 的新项目。另外,我需要处理来自 dao 的错误,它们必须被处理或转换为更高级别的错误。

DataSource.class :

List<Item> cache;

Flowable<Item> getRange(int start, int amount) {

final int cacheSize = cache.size();
final int canLoadFromCache = cacheSize - start;
final int loadFromDao = amount - canLoadFromCache;

if (isCorrupted) return Flowable.fromCallable(() -> {
throw new Exception("CorruptedDatasource");
});

Flowable<Item> cacheFlow = null;
Flowable<Item> daoFlow = null;

if (canLoadFromCache > 0) {
cacheFlow = Flowable.fromIterable(
cache.subList(start, canLoadFromCache)
);

daoFlow = dao.getRange(
uri,
cacheSize, //start
loadFromDao //amount
);
} else {
if (isFullyCached) return Flowable.fromCallable(() -> {
throw new Exception("OutOfBounds");
});

//To not deal with gaps load and cache data between;
//Or replace it with data structure which can handle for us;
daoFlow = dao.getRange(
uri,
cacheSize,
start - cacheSize + amount);
//all these items should be cached;
//other cached and put downstream;
//Dao errs should be converted to higher lever exceptions,
//Or set flags in DataSource;
}
// return concatenated flowable;
}

在更高级别的存储库中,连接来自多个数据源的数据,因此必须有一种方法来连接来自多个数据源的范围,如果一个数据源还不够,则应添加下一个数据源的范围。

请帮助我!

最佳答案

尝试使用 concatconcatEager 连接两个可观察量。此外,doOnNext()doOnError() 可以帮助您进行缓存和错误处理

List<Item> cache;

Flowable<Item> getRange(int start, int amount) {

...
if (isFullyCached) return Flowable.fromCallable(() -> {
throw new Exception("OutOfBounds");
});

//To not deal with gaps load and cache data between;
//Or replace it with data structure which can handle for us;
daoFlow = dao.getRange(
uri,
cacheSize,
start - cacheSize + amount);
//all these items should be cached;
//other cached and put downstream;
.doOnNext(result -> /* insert caching logic here */)
//Dao errs should be converted to higher lever exceptions,
//Or set flags in DataSource;
.doOnError(error -> /* handle error here */)
.onErrorReturn(/* and/or return some empty item */)
}
// return concatenated flowable;
return cacheFlow.concat(daoFlow);
}

关于java - 来自缓存的 Flowable 和使用 RxJava 的其他 Flowable for DataSource,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56327395/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com