gpt4 book ai didi

reactive-programming - 如何使用 RxJava 串行批处理长进程?

转载 作者:行者123 更新时间:2023-12-02 09:34:11 24 4
gpt4 key购买 nike

我有一个很大的字符串列表,需要根据远程 API 检查。

Observable.from(List<String> strings) // let's say the `strings` has > 5000 items
.buffer(50) // splitting the strings into 50-sized chunks, it returns Observable<List<String>> (fast)
.flatMap((strings) -> {
// checkPhoneNumbers is a network call using Retrofit's RxJava (slow)
return mSyncApi.checkPhoneNumbers(strings);
})
.reduce( ... ) // aggregate all checking results
.subscribe( ... );

问题是buffer()似乎发出List<String>太快了,所有多个 .checkPhoneNumbers()几乎同时被处决。 我想要实现的是入队 .checkPhoneNumbers()更好地支持连接速度慢的设备。

限制发出的 List<String>按预定义的时间间隔没有意义,因为这对于具有闪电般快速连接的设备来说是一个缺点。我尝试过RxJava的serialize()就在flatMap()之后但这似乎没有任何区别(尽管我不知道 serialize 的使用是否正确)。

任何替代方法表示赞赏!谢谢。

最佳答案

正如 @zsxwing 所建议的,如果您试图限制 flatMap 内部发生的并发,我认为 maxConcurrent 重载就是您所需要的。

例如:https://gist.github.com/benjchristensen/a0350776a595fd6e3810#file-parallelexecution-java-L78

private static void flatMapBufferedExampleAsync() {
final AtomicInteger total = new AtomicInteger();
Observable.range(0, 500000000)
.doOnNext(i -> total.incrementAndGet())
.buffer(100)
.doOnNext(i -> System.out.println("emit " + i))
.flatMap(i -> {
return Observable.from(i).subscribeOn(Schedulers.computation()).map(item -> {
// simulate computational work
try {
Thread.sleep(10);
} catch (Exception e) {
}
return item + " processed " + Thread.currentThread();
});
}, 2 /* limit concurrency to 2 */) // <--- note argument here
.toBlocking().forEach(System.out::println);

System.out.println("total emitted: " + total.get());
}

关于reactive-programming - 如何使用 RxJava 串行批处理长进程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28938234/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com