gpt4 book ai didi

spring-webflux - 如何使用 react 器有条件地重复或重试

转载 作者:行者123 更新时间:2023-12-04 13:55:01 27 4
gpt4 key购买 nike

我在 Webflux 中使用 SpringBoot 和响应式编程。我想重复该服务,直到数据可用(除空值外返回其他内容)
我有一个服务将一些数据插入到数据库中,并且有第二个服务消耗数据。
我想继续从第二个服务查询数据库,直到数据可用。下面的代码我试图使用 Project Reactor 实现这一点:

Mono<SubscriptionQueryResult<App, App>> subscriptionQuery = reactiveQueryGateway
.subscriptionQuery(new FindAppByIdQuery(appId), ResponseTypes.instanceOf(App.class), ResponseTypes.instanceOf(App.class));

subscriptionQuery
.filter(a -> Objects.nonNull(a.initialResult().block()))
.repeatWhen(Repeat.onlyIf(repeatContext -> true)
.exponentialBackoff(Duration.ofMillis(100), Duration.ofSeconds(100))
.timeout(Duration.ofSeconds(30))).subscribe();
在执行此操作时,我遇到以下异常:
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1
在浏览 webflux 文档时,我发现在 Reactor 线程中无法调用 block() 函数。这样的尝试会导致上述错误:
为了克服我在下面尝试过的问题:
subscriptionQuery
.flatMap(a -> a.initialResult())
.filter(a -> Objects.nonNull(a))
.repeatWhen(Repeat.onlyIf(repeatContext -> true)
.exponentialBackoff(Duration.ofMillis(100), Duration.ofSeconds(100))
.timeout(Duration.ofSeconds(30)))
.subscribe();
但它没有给我想要的结果,我想我错过了一些东西。任何人都可以请提出正确的方法来实现这一目标。
谢谢。

最佳答案

让我试着帮助你解决这个问题。
事实上,最好的解决方法是在发送命令之前订阅它。
通过这种方式,您知道订阅​​查询何时发出更新。
我们有一个 code-samples这可以帮助你。
对此,您最感兴趣的部分应该是 CommandController 上的这个部分。 :

public <U> Mono<U> sendAndReturnUpdate(Object command, SubscriptionQueryResult<?, U> result) {
/* The trick here is to subscribe to initial results first, even it does not return any result
Subscribing to initialResult creates a buffer for updates, even that we didn't subscribe for updates yet
they will wait for us in buffer, after this we can safely send command, and then subscribe to updates */
return Mono.when(result.initialResult())
.then(Mono.fromCompletionStage(() -> commandGateway.send(command)))
.thenMany(result.updates())
.timeout(Duration.ofSeconds(5))
.next()
.doFinally(unused -> result.cancel());
/* dont forget to close subscription query on the end and add a timeout */
}

关于spring-webflux - 如何使用 react 器有条件地重复或重试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64105604/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com