gpt4 book ai didi

java - 如何使用Webflux异步将对象放入S3?

转载 作者:行者123 更新时间:2023-12-01 16:20:05 24 4
gpt4 key购买 nike

一篇文章,AWS S3 with Java – Reactive ,介绍如何将 AWS SDK 2.0 客户端与 Webflux 结合使用。

example ,他们使用以下处理程序上传到 S3,然后返回 HTTP Created 响应:

@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(@RequestHeader HttpHeaders headers,
@RequestBody Flux<ByteBuffer> body) {

long length = headers.getContentLength();
String fileKey = UUID.randomUUID().toString();
Map<String, String> metadata = new HashMap<String, String>();

CompletableFuture future = s3client
.putObject(PutObjectRequest.builder()
.bucket(s3config.getBucket())
.contentLength(length)
.key(fileKey.toString())
.contentType(MediaType.APPLICATION_OCTET_STREAM.toString())
.metadata(metadata)
.build(),
AsyncRequestBody.fromPublisher(body));

return Mono.fromFuture(future)
.map((response) -> {
checkResult(response);
return ResponseEntity
.status(HttpStatus.CREATED)
.body(new UploadResult(HttpStatus.CREATED, new String[] {fileKey}));
});
}

这按预期工作。在尝试学习 WebFlux 时,我预计以下内容将在调用订阅方法的同一线程中异步完成到 S3 的 HTTP 上传:

@PostMapping
public Mono<ResponseEntity<UploadResult>> uploadHandler(@RequestHeader HttpHeaders headers, @RequestBody Flux<ByteBuffer> body) {

long length = headers.getContentLength();
String fileKey = UUID.randomUUID().toString();
Map<String, String> metadata = new HashMap<String, String>();

Mono<PutObjectResponse> putObjectResponseMono = Mono.fromFuture(s3client
.putObject(PutObjectRequest.builder()
.bucket(s3config.getBucket())
.contentLength(length)
.key(fileKey.toString())
.contentType(MediaType.APPLICATION_OCTET_STREAM.toString())
.metadata(metadata)
.build(),
AsyncRequestBody.fromPublisher(body)));

putObjectResponseMono
.doOnError((e) -> {
log.error("Error putting object to S3 " + Thread.currentThread().getName(), e);
})
.subscribe((response) -> {
log.info("Response from S3: " + response.toString() + "on " + Thread.currentThread().getName());
});

return Mono.just(ResponseEntity
.status(HttpStatus.CREATED)
.body(new UploadResult(HttpStatus.CREATED, new String[]{fileKey})));
}

HTTP POST 按预期完成,但 S3 put 请求失败并显示以下日志消息:

2020-06-10 12:31:22.275 ERROR 800 --- [tyEventLoop-0-4] c.b.aws.reactive.s3.UploadResource       : Error happened on aws-java-sdk-NettyEventLoop-0-4

software.amazon.awssdk.core.exception.SdkClientException: 400 BAD_REQUEST "Request body is missing: public reactor.core.publisher.Mono<org.springframework.http.ResponseEntity<com.baeldung.aws.reactive.s3.UploadResult>> com.baeldung.aws.reactive.s3.UploadResource.uploadHandler(org.springframework.http.HttpHeaders,reactor.core.publisher.Flux<java.nio.ByteBuffer>)"
at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:97) ~[sdk-core-2.10.27.jar:na]
at software.amazon.awssdk.core.internal.util.ThrowableUtils.asSdkException(ThrowableUtils.java:98) ~[sdk-core-2.10.27.jar:na]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryIfNeeded(AsyncRetryableStage.java:125) ~[sdk-core-2.10.27.jar:na]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:107) ~[sdk-core-2.10.27.jar:na]
........

我怀疑这个解释涉及对 S3 的请求在它自己的线程上运行,但我很难弄清楚出了什么问题,你能解释一下吗?

最佳答案

试试这个

@RequestBody Flux<ByteBuffer> body
>>> replace @RequestBody byte[]

AsyncRequestBody.fromPublisher(body)
>>> replace .fromBytes(body)

如果您想从另一个线程订阅,请使用:.subscribeOn({Schedulers})

关于java - 如何使用Webflux异步将对象放入S3?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62302733/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com