gpt4 book ai didi

Java react 器-链Mono 与另一个产生Mono 的异步任务
转载 作者:行者123 更新时间:2023-12-03 19:42:08 26 4
gpt4 key购买 nike

我有以下异步任务:

public class AsyncValidationTask {
// Returns Mono.error(new Exception()) if error, otherwise Mono.empty()
public Mono<Void> execute(Object o);
}

public class AsyncSaveTask {
// Returns Mono.error(new Exception()) if error, otherwise Mono of Object
public Mono<Object> execute(Object o);
}

及以下服务类别:

public class AsyncService {

private AsyncValidationTask validation;

private AsyncSaveTask save;

public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
// Right now, the problem is that when validation completes successfully, it
// emits Mono.empty hence the flatMap chained below will not be invoked at all.
.flatMap(dontcare -> this.save.execute(o))
}
}

如上所示,如果 flatMap成功完成,我尝试使用 AsyncSaveTask.execute链接 AsyncValidationTask.execute调用,但由于完成后未发出任何内容(Mono.empty),因此无法正常工作。

我还考虑 then链接第二个调用,但是无论第一个验证调用产生了Mono.error,它将始终调用链接的调用。

如何正确链接它们?

最佳答案

.then,用于仅链接终端的源

使用.then,以便与您执行的过程一起执行,该过程仅发送终端信号。

另外,请注意,如果您需要对错误信号进行某些操作,则必须事先将.thenonErrorResume一起使用。

public class AsyncService {

private AsyncValidationTask validation;

private AsyncSaveTask save;

public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
.onErrorResume(t -> ...) // should be before then
.then(this.save.execute(o))
}
}
.defer为了推迟单声道创建

为了仅在成功验证的情况下执行 this.save.execute(o),您还必须将其包装在 Mono.defer中:
public class AsyncService {

private AsyncValidationTask validation;

private AsyncSaveTask save;

public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
.onErrorResume(t -> ...) // should be before then
.then(Mono.defer(() -> this.save.execute(o)))
}
}

通常不需要,因为 Mono LAZY 类型,只有在发生 订阅(预订== .subscribe())的情况下, 应该开始工作。
Mono#then 的实现可确保,即对 Mono返回的 this.save.execute的订阅将在方法, Mono.defer(() -> this.validation.execute(o)) 完成后开始。

执行可能较早开始的唯一原因可能是目的(例如,提供有意提供此类行为的业务逻辑-缓存,热源等) this.save.execute(o)的INCORRECT实现,无论实际订阅如何,它都会开始工作。

正确设计实现

通常,最好确保能够正常工作并将其作为 Publisher公开的API(例如 Mono | Flux)是惰性的。

这意味着API创建者必须确保仅在用户订阅了给定的 Publisher实例的情况下才执行工作。

例如,如果您的异步API在下面进行了 CompletableFuture的创建,则值得将 CompletableFuture的创建手动包装到 Mono.defer或使用适当的方法扩展名,例如 Mono.fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier)
执行器示例

让我们考虑如何使常规ThreadPool任务提交处于 Activity 状态。
interface Executor  {
Future<T> execute(Callable<T> runnable);
}


因此,为了使 Executor具有反应性,我们必须创建类似以下内容的东西:
interface ReactiveExecutor {
Mono<T> execute(Callable<T> runnable);
}

实现不正确

以下是有效的此类适配器的可能实现:
class ReactiveExecutorAdapter {
final Executor delegate;

...


Mono<T> execute(Callable<T> runnable) {
MonoProcessor<T> result = MonoProcessor.create();
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
result.onNext(value);
result.onComplet();
return value;
});

return result.doOnCancel(() -> task.cancel());
}
}

无疑,这样的实现将起作用。但是,它存在一些关键问题:
  • 从方法调用开始执行(这有点与响应流的惰性行为矛盾Publisher)
  • 因为执行是在实际任务订阅之前开始的,所以我们必须创建一个有状态的Mono,它支持以后的订阅。
  • 根本没有订阅者(例如,执行已开始,但是没有.subscribe方法发生(然后我们发生了值泄漏,无法处理))时,此实现不处理这种情况
  • 一般而言,它太笨拙,无法解决。另外,为了防止前面提到的所有情况,有必要在实现之外用Mono execute(..)包装对Mono.defer的每次调用(请参阅问题中的原始问题)。随后,这导致了一个事实,即API用户可以轻松地“自发自足”,而无需使用额外的.defer
  • 来包装执行

    那么,如何解决呢?

    基本上,将 Mono.defer移动到库内部就足够了。这将使API用户的生活变得更加轻松,因为他们不必考虑何时需要使用延迟(因此,减少了可能的问题)。

    例如,对于我们的Reactive Executor,最简单的解决方案可以是以下方法:
    class ReactiveExecutorAdapter {
    final Executor delegate;

    ...


    Mono<T> execute(Callable<T> runnable) {
    Mono.defer(() -> {
    MonoProcessor<T> result = MonoProcessor.create();
    Future<T> task = delegate.execute(() -> {
    T value = runnable.call();
    result.onNext(value);
    result.onComplet();
    return value;
    });

    return result.doOnCancel(() -> task.cancel());
    })
    }
    }

    通过仅推迟执行,就可以确定解决至少一个问题-确保值(value)不再泄漏。

    但是,如何正确解决呢?

    但是,为了解决特定情况下的所有可能问题,我们可以使用为采用异步API设计的 Mono.create:
    class ReactiveExecutorAdapter {
    final Executor delegate;

    ...


    Mono<T> execute(Callable<T> runnable) {
    Mono.crete(monoSink -> {

    Future<T> task = delegate.execute(() -> {
    T value = runnable.call();
    monoSink.complete(value);
    });

    monoSink.doOnCancel(task::cancel);
    })
    }
    }

    使用 Mono.create,我们可以保证每个订阅者都可以延迟执行。
    另外,使用 MonoSink API,我们可以快速挂接到来自订户的所有基本信号。
    最后,Mono.create确保在任何情况下,该值都将被适本地丢弃。

    最后,有了这样的API,就不必在所有情况下都使用defer

    关于Java react 器-链Mono <Void>与另一个产生Mono <Object>的异步任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60836450/

    26 4 0