gpt4 book ai didi

kotlin - RxJava2和改造:如何获取数据页面

转载 作者:行者123 更新时间:2023-12-02 02:53:53 25 4
gpt4 key购买 nike

目标:我想重复调用返回页面数据的翻新服务(GET),直到用完其页面为止。从第0页到第n页。

首先,我已经看过these two答案。第一个实际可行,但我不太喜欢递归解决方案,因为它可能导致堆栈溢出。当您尝试使用调度程序时,第二个失败。

这是第二个示例:

Observable.range(0, 5/*Integer.MAX_VALUE*/) // generates page values
.subscribeOn(Schedulers.io()) // need this to prevent UI hanging
// gamesService uses Schedulers.io() by default
.flatMapSingle { page -> gamesService.getGames(page) }
.takeWhile { games -> games.isNotEmpty() } // games is a List<Game>
.subscribe(
{ games -> db.insertAll(games) },
{ Logger.e(TAG, it, "Error getting daily games: ${it.message}") }
)

我希望这样做是在 gamesService.getGames(page)返回空列表的那一刻停止。取而代之的是,它会随着页面值的增加,无限次地到达端点。我已经使用 Single.just(intVal)在单元测试中进行了一些试验,并确定问题似乎是我的服务已自动订阅 Schedulers.io()。这就是我定义改造服务的方式:
private inline fun <reified T> createService(okClient: OkHttpClient): T {
val rxAdapter = RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io())
val retrofit = Retrofit.Builder()
.baseUrl(config.apiEndpoint.endpoint())
.client(okClient)
.addCallAdapterFactory(rxAdapter)
.addConverterFactory(moshiConverterFactory())
.build()

return retrofit.create(T::class.java)
}

在这里不使用 createWithScheduler()确实不是一个选择。

这是我尝试过的另一个想法:
val atomic = AtomicInteger(0)
Observable.generate<Int> { it.onNext(atomic.getAndIncrement()) }
.subscribeOn(Schedulers.io())
.flatMapSingle { page -> gamesService.getGames(page) }
.takeWhile { games -> games.isNotEmpty() }
.subscribe(
{ games -> dailyGamesDao.insertAll(games) },
{ Logger.e(TAG, it, "Error getting daily games: ${it.message}") }
)

这是另一种情况,直到我引入 Scheduler为止,它都按预期工作。当我期望 takeWhile发现一个空列表时,该生成器会停止运行时,生成器会生成太多值。

我还尝试了各种 concat(concatWith,concatMap等)。

在这一点上,我真的只是在寻找可以帮助我纠正RxJava运算符显然(对他们)和完全基本的误解的人。

最佳答案

我找到了部分解决方案。 (如果以后找到“最终”解决方案,则可以稍后编辑此答案。)

tl; dr 我应该将Single转换为Observable,并使用带有flatMap参数的maxConcurrency重载。例如:

Observable.range(0, SOME_SUFFICIENTLY_LARGE_NUMBER)
.subscribeOn(Schedulers.io())
.flatMap({ page -> gamesService.getGames(page).toObservable }, 1 /* maxConcurrency */)
.takeWhile { games -> games.isNotEmpty() }
.subscribe(
{ games -> dailyGamesDao.insertAll(games) },
{ Logger.e(TAG, it, "Error getting daily games: ${it.message}") }
)

基本上做到了。通过将并发线程数限制为1,我现在具有我正在寻求的“一个接一个”的行为。我对此不满意的唯一一点是,我想这是个小问题,因为我的基本 Observable.range()仍然可以发出很多值-下游 Single s / Observable所用的方式比以往任何时候都多。

PS:我较早找不到此解决方案的原因之一是我正在使用RxJava 2.1.9。当我将其推至2.1.14时,我可以使用新的重载。那好吧。

关于kotlin - RxJava2和改造:如何获取数据页面,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50537230/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com