gpt4 book ai didi

java - RxJava : OnErrorFailedException. 确定正确原因

转载 作者:行者123 更新时间:2023-12-01 20:23:37 24 4
gpt4 key购买 nike

受到 T.Nurkiewicz 的“RxJava 响应式编程”的启发,我尝试将其应用到我正在开发的项目中,这就是我面临的问题。

我有一个休息端点,它接受输入流和用户名,并返回更新后的用户名的链接或返回错误请求错误。以下是我尝试使用 RxJava 实现此功能的方法:

    @PUT
@Path("{username}")
public Response updateCredential(@PathParam("username") final String username, InputStream stream) {
CredentialCandidate candidate = new CredentialCandidate();
Observable.just(repository.getByUsername(username))
.subscribe(
credential -> {
serializeCandidate(candidate, stream);
try {
repository.updateCredential(build(credential, candidate));
} catch (Exception e) {
String msg = "Failed to update credential +\""+username+"\": "+e.getMessage();
throw new BadRequestException(msg, Response.status(Response.Status.BAD_REQUEST).build());
}
},
ex -> {
String msg = "Couldn't update credential \""+username+"\""
+ ". A credential with such username doesn't exist: " + ex.getMessage();
logger.error(msg);
throw new BadRequestException(msg, Response.status(Response.Status.BAD_REQUEST).build());
});//if the Observable completes without exceptions we have a success case
Map<String, String> map = new HashMap<>();
map.put("path", "credential/" + username);
return Response.ok(getJsonRepr("link", uriGenerator.apply(appsUriBuilder, map).toASCIIString())).build();
}

我的问题出在第 11 行(onNext 方法的 catch 子句)。这是日志输出,将快速演示发生的情况:

19:23:50.472 [http-listener(4)] ERROR com.vgorcinschi.rimmanew.rest.services.CredentialResourceService             - Couldn't update credential "admin". A credential with such username doesn't exist: Failed to update credential +"admin": Password too weak! 

因此,在 onNext 方法中抛出的异常会转到上游并最终在 onError 方法中!显然this works as designed ,但我对如何返回错误请求错误的正确原因感到困惑。毕竟在我的测试用例中,存储库找到了用户的凭据,正确的错误是建议的密码太弱。这是生成错误的辅助方法:

private Credential build(Credential credential, CredentialCandidate candidate) {
if(!isOkPsswd.test(candidate.getPassword())){
throw new BadRequestException("Password too weak!", Response.status(Response.Status.BAD_REQUEST).build());
}
...
}

我对响应式(Reactive)编程还很陌生,所以我意识到我可能遗漏了一些明显的东西。浏览这本书并没有让我找到答案,所以我将不胜感激任何帮助。

以防万一,这是完整的堆栈跟踪:

updateCredentialTest(com.vgorcinschi.rimmanew.services.CredentialResourceServiceTest)  Time elapsed: 0.798 sec  <<< ERROR!
rx.exceptions.OnErrorFailedException: Error occurred when trying to propagate error to Observer.onError
at com.vgorcinschi.rimmanew.rest.services.CredentialResourceService.lambda$updateCredential$9(CredentialResourceService.java:245)
at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
at rx.internal.util.ScalarSynchronousObservable$WeakSingleProducer.request(ScalarSynchronousObservable.java:276)
at rx.Subscriber.setProducer(Subscriber.java:209)
at rx.Subscriber.setProducer(Subscriber.java:205)
at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:138)
at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:129)
at rx.Observable.subscribe(Observable.java:10238)
at rx.Observable.subscribe(Observable.java:10205)
at rx.Observable.subscribe(Observable.java:10045)
at com.vgorcinschi.rimmanew.rest.services.CredentialResourceService.updateCredential(CredentialResourceService.java:238)
at com.vgorcinschi.rimmanew.services.CredentialResourceServiceTest.updateCredentialTest(CredentialResourceServiceTest.java:140)

最佳答案

看来你没有正确掌握响应式编程原理。

第一件事是 Observable 的 API 是异步的,而您试图通过尝试直接从方法返回 Response 值来强制其为同步 API,而不是返回随时间推移发出此 Observable<Response> 值的 Response它的 onNext() 通知。
这就是为什么您正在与异常作斗争,每个通知 lambda 方法( onNext/onError )都由 Observable 机制封装,为了创建遵守某些规则( Observable contract )的适当流,其中一些预期行为是错误应该被重定向到 onError() 方法,这是异常 catch 方法,你不应该在那里抛出,在那里抛出将被视为 fatal error ,并会被抛出 OnErrorFailedException 吞没。

理想情况下,它会是这样的:

public Observable<Response> updateCredential(@PathParam("username") final String username,
InputStream stream) {
rerurn Observable.fromCallable(() -> {
CredentialCandidate candidate = new CredentialCandidate();
Credential credential = repository.getByUsername(username);
serializeCandidate(candidate, stream);
repository.updateCredential(build(credential, candidate));
Map<String, String> map = new HashMap<>();
map.put("path", "credential/" + username);
return Response.ok(getJsonRepr("link", uriGenerator.apply(appsUriBuilder, map).toASCIIString())).build();
})
.onErrorReturn(throwable -> {
String msg = "Failed to update credential +\"" + username + "\": " + e.getMessage();
throw new BadRequestException(msg, Response.status(Response.Status.BAD_REQUEST).build());
});
}

使用 fromCallable 以便在订阅时发生请求(而 Observable.just(repository.getByUsername(username)) 将在构造 Observable 时同步操作),成功路径与可调用本身一起,而如果发生任何错误,您将其转换为您的自定义异常使用 onErrorReturn 运算符。

使用他的方法,您将返回 Observable 对象,该对象将在您订阅它时起作用,您将获得 Observable 和响应式方法的所有好处,例如能够将其与其他一些操作组合,能够从外部指定是否它将同步操作(当前线程)或在其他线程上异步操作(使用 Scheduler )。

有关响应式(Reactive)编程的更详细解释,我建议从 André Staltz 的这个伟大的 tutorial 开始。

关于java - RxJava : OnErrorFailedException. 确定正确原因,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44233408/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com