gpt4 book ai didi

rx-java - 如何在 RxJava 中延迟 Observable 发射

转载 作者:行者123 更新时间:2023-12-02 18:18:34 26 4
gpt4 key购买 nike

我们有微服务架构,我们通过网络进行服务间调用。我们在顶层服务中使用 RxJava,这会导致向底层服务创建大量并行请求。因此,我收到“没有到主机的路由错误”或“连接错误”。为此,我想减慢 RxJava Observable 的发射速度,以便在创建新连接之前先关闭较早的连接。下面是示例代码:

    package com.demo.rxjava.rxjaxa.creation;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class Delay {

public static void main(String[] args) throws InterruptedException {
Observable.just(1, 2, 3, 4, 5).subscribeOn(Schedulers.io())
.flatMap(integer -> {
return function1(integer);
}).observeOn(Schedulers.io())
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
}

public Observable<String> function1(String id) {
// This is where we make network call
Observable<Response> response = Rx.newClient(RxObservableInvoker.class)
.target("http://example.com/resource")
.request()
.queryParam("id", id)
.rx()
.get();
response.obserOn(Schedulers.from(threadExecutor)).flatMap(response->{
return response.extractResponse();
});
}
}

最佳答案

您应该让对底层服务的请求发生在限制并行事件的调度程序上,而不是延迟您的请求。例如:

 int maxParallel = 4;
Scheduler scheduler = Schedulers.from(
Executors.newFixedThreadPool(maxParallel));
...
observable
.flatMap(x ->
submitToBottomService(x)
.subscribeOn(scheduler))
.subscribe(subscriber);

顺便说一句,你提到关闭连接。 Observable.using 运算符旨在关闭响应式(Reactive)上下文中的资源(它在终止和取消订阅时关闭资源)。如果您还没有使用它,请看一下。

关于rx-java - 如何在 RxJava 中延迟 Observable 发射,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38164006/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com