gpt4 book ai didi

java - RxJava : How to get all results AND errors from an Observable

转载 作者:行者123 更新时间:2023-12-01 21:27:32 28 4
gpt4 key购买 nike

我正在做一个涉及Hystrix的项目,我决定使用RxJava。现在,忘记 Hystrix 吧,因为我相信主要问题是我完全搞砸了正确编写 Observable 代码。

需要:我需要一种方法来返回一个代表多个可观察对象的可观察对象,每个可观察对象运行一个用户任务。我希望 Observable 能够返回任务的所有结果,甚至是错误。

问题:可观察流因错误而终止。如果我有三个任务,而第二个任务抛出异常,那么我永远不会收到第三个任务,即使它会成功。

我的代码:

public <T> Observable<T> observeManagedAsync(String groupName,List<EspTask<T>> tasks) {
return Observable
.from(tasks)
.flatMap(task -> {
try {
return new MyCommand(task.getTaskId(),groupName,task).toObservable().subscribeOn(this.schedulerFactory.get(groupName));
} catch(Exception ex) {
return Observable.error(ex);
}
});
}

鉴于 MyCommand 是一个扩展 HystrixObservableCommand 的类,它返回一个 Observable,因此不应考虑我所看到的问题。

尝试 1:

如上使用Observable.flatMap

  • 好:每个命令都安排在它自己的线程上,并且任务异步运行。
  • 坏:在第一个命令异常时,Observable 完成发出先前的成功结果并发出异常。任何正在执行的命令都会被忽略。

尝试 2:

使用Observable.concatMapDelayError而不是flatMap

  • 坏:由于某种原因,任务同步运行。为什么??
  • 好:我得到了所有成功的结果。
  • ~Good: OnError 获取一个复合异常,并包含抛出的异常列表。

任何帮助将不胜感激,并且可能会导致我因为自己没有想到而感到非常尴尬。

附加代码

此测试在使用 Observable.flatMap 时成功,但在使用 Observable.concatMapDelayError 时失败,因为任务不异步运行:

java.lang.AssertionError:执行时间超过 350 毫秒限制:608

@Test
public void shouldRunManagedAsyncTasksConcurrently() throws Exception {
Observable<String> testObserver = executor.observeManagedAsync("asyncThreadPool",getTimedTasks());
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
long startTime = System.currentTimeMillis();
testObserver.doOnError(throwable -> {
System.out.println("error: " + throwable.getMessage());
}).subscribe(testSubscriber);
System.out.println("Test execution time: "+(System.currentTimeMillis()-startTime));
testSubscriber.awaitTerminalEvent();
long execTime = (System.currentTimeMillis()-startTime);
System.out.println("Test execution time: "+execTime);
testSubscriber.assertCompleted();
System.out.println("Errors: "+testSubscriber.getOnErrorEvents());
System.out.println("Results: "+testSubscriber.getOnNextEvents());
testSubscriber.assertNoErrors();
assertTrue("Execution time ran under the 300ms limit: "+execTime,execTime>=300);
assertTrue("Execution time ran over the 350ms limit: "+execTime,execTime<=350);
testSubscriber.assertValueCount(3);
assertThat(testSubscriber.getOnNextEvents(),containsInAnyOrder("hello","wait","world"));
verify(this.mockSchedulerFactory, times(3)).get("asyncThreadPool");
}

上述单元测试的任务:

protected List<EspTask<String>> getTimedTasks() {
EspTask longTask = new EspTask("helloTask") {
@Override
public Object doCall() throws Exception {
Thread.currentThread().sleep(100);
return "hello";
}
};
EspTask longerTask = new EspTask("waitTask") {
@Override
public Object doCall() throws Exception {
Thread.currentThread().sleep(150);
return "wait";
}

};
EspTask longestTask = new EspTask("worldTask") {
@Override
public Object doCall() throws Exception {
Thread.currentThread().sleep(300);
return "world";
}
};
return Arrays.asList(longTask, longerTask, longestTask);
}

最佳答案

您可以使用Observable.onErrorReturn(),并返回特殊值(例如null),然后在下游过滤非特殊值。请记住,源 observable 将在出现错误时完成。另外,根据用例,Observable.onErrorResumeNext()方法也可能很有用。如果您对错误通知感兴趣,请使用 Observable.materialize(),这会将项目和 onError()onComplete() 转换为通知,然后可以通过 Notification.getKind()

进行过滤

编辑。假设 try/catch 不存在,上面提到的所有运算符都应该添加在 .toObservable().subscribeOn(this.schedulerFactory.get(groupName)); 之后。

关于java - RxJava : How to get all results AND errors from an Observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37866053/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com