gpt4 book ai didi

spring - 从返回 future 的服务创建 Mono/Flux 的正确方法

转载 作者:行者123 更新时间:2023-12-05 06:37:07 25 4
gpt4 key购买 nike

我如何正确处理从 futures 构建的 Monos?

我正在努力了解 Spring Reactive(和 Spring 5),观看所有视频并阅读我能找到的所有博客,但他们似乎都没有做一些比查询数据库稍微多一点的事情,或其他一些微不足道的事情。

我正在使用新的 AWS 2.0 SDK,它使用 CompletableFuture 来处理大多数事情。使用服务创建新实例,我的方法如下所示

public Mono<RunInstancesResponse> create(Instance instance) {
RunInstancesRequest runInstancesRequest = RunInstancesRequest.builder()
.instanceType(instance.getInstanceType())
.imageId(instance.getImageId())
.securityGroupIds(instance.getSecurityGroupIds())
.keyName(instance.getKeyName())
.minCount(1)
.maxCount(1)
.tagSpecifications(createTags(instance))
.build();

CompletableFuture<RunInstancesResponse> future = client.runInstances(runInstancesRequest);

future.whenComplete((response, error) -> {
response.reservation().instances().stream().map(aws -> Instance.builder()
.imageId(aws.imageId())
.build()
).forEach(instanceRepository::save);

});

return Mono.fromFuture(future);
}

我的理解是我几乎立即返回类型为 RunInstancesResponseMono,而 future.whenComplete 会做这件事每当。

我从我的路由处理程序中调用它,看起来像

public Mono<ServerResponse> create(ServerRequest request) {
return request.bodyToMono(Instance.class)
.flatMap(createService::create)
.flatMap(i -> ServerResponse.accepted().build());
}

现在这几乎可以按我的预期工作,但是有几个关键问题我不知道如何解决。

1.) whenComplete 从未被调用,我相信这是因为我没有订阅它。

2.) 在 whenComplete 完成之前(大约 2.5 秒)服务器不会响应客户端,这并不理想,因为我希望它立即响应然后更新客户端当调用 whenComplete 时。

我感觉我的整个服务和处理程序的做事方式完全错误。

我喜欢一些示例,说明我应该如何处理服务中的 future,其中它是从具有 MonoFlux 类型的路由处理程序调用的。

最佳答案

我写了一个 open source library它包装了 SQS、SNS 和 DynamoDb,使这更容易一些。为避免仅链接答案,您可以对其应用以下内容:

public Mono<ChangeMessageVisibilityResult> changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout) {
return Mono.create(subscriber -> amazonClient.changeMessageVisibilityAsync(queueUrl, receiptHandle, visibilityTimeout, AmazonWebServiceRequestAsyncHandler.valueEmittingHandlerFor(subscriber)));
}

传递的处理程序在两个世界之间转换:

public class AmazonWebServiceRequestAsyncHandler<RQ extends AmazonWebServiceRequest, RS> implements AsyncHandler<RQ, RS> {
private final MonoSink<? super RS> subscriber;
private boolean shouldEmitValue;

private AmazonWebServiceRequestAsyncHandler(MonoSink<? super RS> subscriber, boolean shouldEmitValue) {
this.subscriber = subscriber;
this.shouldEmitValue = shouldEmitValue;
}

@Override
public void onError(Exception exception) {
subscriber.error(exception);
}

@Override
public void onSuccess(RQ request, RS response) {
if (shouldEmitValue) {
subscriber.success(response);
} else {
subscriber.success();
}
}

public static <RQ extends AmazonWebServiceRequest, RS> AsyncHandler<RQ, RS> valueEmittingHandlerFor(final MonoSink<? super RS> subscriber) {
return new AmazonWebServiceRequestAsyncHandler<>(subscriber, true);
}

public static <RQ extends AmazonWebServiceRequest> AsyncHandler<RQ, Void> voidHandlerFor(MonoSink<? super Void> subscriber) {
return new AmazonWebServiceRequestAsyncHandler<>(subscriber, false);
}
}

关于spring - 从返回 future 的服务创建 Mono/Flux 的正确方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48372032/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com