gpt4 book ai didi

java - RxJava 组合请求序列

转载 作者:IT老高 更新时间:2023-10-28 20:46:38 29 4
gpt4 key购买 nike

问题

我有两个 API。 Api 1 为我提供了一个项目列表,而 Api 2 为我从 Api 1 获得的每个项目提供了更详细的信息。到目前为止,我解决它的方式导致性能不佳。

问题

在 Retrofit 和 RxJava 的帮助下高效快速地解决这个问题。

我的方法

目前我的解决方案如下所示:

第 1 步:改造执行 Single<ArrayList<Information>>来自 API 1。

第 2 步:我遍历这些项目并向 Api 2 发出每个请求。

第 3 步:Retrofit Returns 依次执行 Single<ExtendedInformation>为了每个项目

第 4 步:在 Api 2 的所有调用完全执行后,我为所有项目创建了一个新对象,并结合了信息和扩展信息。

我的代码

 public void addExtendedInformations(final Information[] informations) {
final ArrayList<InformationDetail> informationDetailArrayList = new ArrayList<>();
final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener = new JSONRequestRatingHelper.RatingRequestListener() {
@Override
public void onDownloadFinished(Information baseInformation, ExtendedInformation extendedInformation) {
informationDetailArrayList.add(new InformationDetail(baseInformation, extendedInformation));
if (informationDetailArrayList.size() >= informations.length){
listener.onAllExtendedInformationLoadedAndCombined(informationDetailArrayList);
}
}
};

for (Information information : informations) {
getExtendedInformation(ratingRequestListener, information);
}
}

public void getRatingsByTitle(final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener, final Information information) {
Single<ExtendedInformation> repos = service.findForTitle(information.title);
disposable.add(repos.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableSingleObserver<ExtendedInformation>() {
@Override
public void onSuccess(ExtendedInformation extendedInformation) {
ratingRequestListener.onDownloadFinished(information, extendedInformation);
}

@Override
public void onError(Throwable e) {
ExtendedInformation extendedInformation = new ExtendedInformation();
ratingRequestListener.onDownloadFinished(extendedInformation, information);
}
}));
}

public interface RatingRequestListener {

void onDownloadFinished(Information information, ExtendedInformation extendedInformation);

}

最佳答案

tl;dr 使用 concatMapEagerflatMap 并异步或在调度程序上执行子调用。


长篇大论

我不是安卓开发者,所以我的问题仅限于纯 RxJava(版本 1 和版本 2)。

如果我得到正确的图片,所需的流程是:

some query param 
\--> Execute query on API_1 -> list of items
|-> Execute query for item 1 on API_2 -> extended info of item1
|-> Execute query for item 2 on API_2 -> extended info of item1
|-> Execute query for item 3 on API_2 -> extended info of item1
...
\-> Execute query for item n on API_2 -> extended info of item1
\----------------------------------------------------------------------/
|
\--> stream (or list) of extended item info for the query param

假设 Retrofit 为

生成客户端
interface Api1 {
@GET("/api1") Observable<List<Item>> items(@Query("param") String param);
}

interface Api2 {
@GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);
}

如果项目的顺序不重要,那么可以只使用flatMap:

api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.flatMap(item -> api2.extendedInfo(item.id()))
.subscribe(...)

仅当改造 builder 配置有

  • 使用异步适配器(调用将在 okhttp 内部执行程序中排队)。我个人认为这不是一个好主意,因为你无法控制这个执行者。

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
  • 或者使用基于调度器的适配器(调用将在 RxJava 调度器上调度)。这是我的首选,因为您明确选择使用哪个调度程序,它很可能是 IO 调度程序,但您可以自由尝试不同的调度程序。

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))

原因是 flatMap 将订阅由 api2.extendedInfo(...) 创建的每个 observable 并将它们合并到生成的 observable 中。因此结果将按照收到的顺序显示。

如果改造客户端设置为异步或设置为在调度程序上运行,则可以设置一个:

api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
.subscribe(...)

这个结构几乎与前一个结构相同,除了它指示本地每个 api2.extendedInfo 应该在哪个调度程序上运行。

可以调整 flatMapmaxConcurrency 参数来控制您要同时执行多少个请求。尽管我会对此保持谨慎,但您不希望同时运行所有查询。通常默认的 maxConcurrency 就足够了(128)。

现在如果原始查询的顺序很重要concatMap 通常是按顺序但按顺序执行与 flatMap 相同的操作的运算符,如果代码需要等待所有子查询完成,这会变得很慢执行。解决方案虽然比 concatMapEager 更进一步,这个解决方案将按顺序订阅 observable,并根据需要缓冲结果。

假设改造客户端是异步的或在特定的调度程序上运行:

api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.concatMapEager(item -> api2.extendedInfo(item.id()))
.subscribe(...)

或者如果必须在本地设置调度程序:

api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
.subscribe(...)

还可以在此运算符中调整并发性。


此外,如果 Api 返回 Flowable,则可以使用目前在 RxJava 2.1.7 中仍处于测试阶段的 .parallel。但是结果不按顺序排列,我不知道如何(还没有?)在不排序的情况下对它们进行排序。

api.items(queryParam) // Flowable<Item>
.parallel(10)
.runOn(Schedulers.io())
.map(item -> api2.extendedInfo(item.id()))
.sequential(); // Flowable<ItemExtended>

关于java - RxJava 组合请求序列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47641605/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com