gpt4 book ai didi

java - 使用 RxJava -Observables 进行异步调用

转载 作者:行者123 更新时间:2023-11-30 03:29:05 24 4
gpt4 key购买 nike

我正在尝试通过实现 RxJava 来进行异步休息调用。下面是实现 -

final Observable<List<A>> observableA = Observable.create(new Observable.OnSubscribe<List<A>>() {
@Override
public void call(Subscriber<? super List<A>> subscriber) {
try {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(//another Function call);
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
});

final Observable<List<B>> observableB = Observable.create(new Observable.OnSubscribe<List<B>>() {
@Override
public void call(Subscriber<? super List<B>> subscriber) {
try {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(//another Function call);
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
});


Observable<List<C>> reservationObserv = Observable.zip(observableA, observableB, new Func2< List<A>, List<B> , List<C>>() {
@Override
public List<C> call(final List<A> a, final List<B> b) {
// Merge the response
return c;
}
});

到目前为止,ObservableA 首先执行,然后是 ObservableB。任何人都可以建议为什么调用不是异步的。

提前致谢。

当我按照以下方式执行时,先执行observableB,然后执行ObservableA

final Observable<List<A>> observableA = Observable.create(new  Observable.OnSubscribe<List<A>>() {

@Override
public void call(final Subscriber<? super List<A>> subscriber) {
Runnable run = new Runnable() {
@Override
public void run() {
// Delay of 1000ms
subscriber.onNext(//calling a method);
subscriber.onCompleted();
}
};
executorService.execute(run);
}
});

final Observable<List<B>> observableB = Observable.create(new Observable.OnSubscribe<List<B>>() {

@Override
public void call(final Subscriber<? super List<B>> subscriber) {
Runnable run = new Runnable() {
@Override
public void run() {
// No delay
subscriber.onNext(//calling a method);
subscriber.onCompleted();
}
};
executorService.execute(run);
}
});


Observable<List<C>> observableC = Observable.zip(observableA, observableB, new Func2< List<A>, List<B> , List<C>>() {
@Override
public List<C> call(final List<A> a, final List<B> b) {
// Merge the response
return c;
}
});

最佳答案

默认情况下,RxJava 是同步的。因此,在第一种情况下,zip 运算符将订阅 observableA,然后,当 observableA 完成时,将订阅 observableB

在第二种情况下,当您使用执行程序服务时,您实际上是异步的。

要与您的第一个版本异步,正如注释中所建议的那样,您应该查看 Schedulers 并告诉 RxJava 您的订阅应该在哪个 schdeulers 中执行。

Observable<List<C>> observableC = Observable.zip(
observableA.subscribeOn(Schedulers.io()),
observableB.subscribeOn(Schedulers.io()),
(a, b) -> /** ... **/);
observableC.subscribe();

您可以使用不同的调度程序,具体取决于您想要实现的目标。 (使用 Schedulers.io() 进行 I/O,...)

关于java - 使用 RxJava -Observables 进行异步调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29481661/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com