gpt4 book ai didi

spring-boot - 使用 Project Reactor 中的 ExchangeFunction 从 Client Request 下载并保存文件

转载 作者:行者123 更新时间:2023-12-04 03:14:40 24 4
gpt4 key购买 nike

在 Project Reactor 中下载完成后,我无法正确保存文件。

class HttpImageClientDownloader implements ImageClientDownloader {

private final ExchangeFunction exchangeFunction;

HttpImageClientDownloader() {
this.exchangeFunction = ExchangeFunctions.create(new ReactorClientHttpConnector());
}

@Override
public Mono<File> downloadImage(String url, Path destination) {

ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create(url)).build();
return exchangeFunction.exchange(clientRequest)
.map(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
//.flatMapMany(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
.flatMap(dataBuffer -> {

AsynchronousFileChannel fileChannel = createFile(destination);
return DataBufferUtils
.write(dataBuffer, fileChannel, 0)
.publishOn(Schedulers.elastic())
.doOnNext(DataBufferUtils::release)
.then(Mono.just(destination.toFile()));


});

}

private AsynchronousFileChannel createFile(Path path) {
try {
return AsynchronousFileChannel.open(path, StandardOpenOption.CREATE);
} catch (Exception e) {
throw new ImageDownloadException("Error while creating file: " + path, e);
}
}
}

所以我的问题是:
DataBufferUtils.write(dataBuffer, fileChannel, 0) 是否阻塞?

磁盘慢时怎么办?

关于发生 ImageDownloadException 时会发生什么的第二个问题,
在 doOnNext 中我想释放给定的数据缓冲区,这是进行此类操作的好地方吗?

我也认为这一行:

            .map(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))


可能会阻塞...

最佳答案

这是实现这一目标的另一种(较短)方法:

Flux<DataBuffer> data = this.webClient.get()
.uri("/greeting")
.retrieve()
.bodyToFlux(DataBuffer.class);

Path file = Files.createTempFile("spring", null);
WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
Mono<File> result = DataBufferUtils.write(data, channel)
.map(DataBufferUtils::release)
.then(Mono.just(file));

现在 DataBufferUtils::write操作不会阻塞,因为它们使用非阻塞 IO 和 channel 。写入这些 channel 意味着它会向输出缓冲区写入任何可以写入的内容(即可以写入所有 DataBuffer 或仅写入其中的一部分)。

使用 Flux::mapFlux::doOnNext是正确的地方做到这一点。但是你是对的,如果发生错误,你仍然负责释放当前缓冲区(以及所有剩余的缓冲区)。 Spring Framework 中可能有一些我们可以改进的地方,请关注 SPR-16782 .

我没有看到您的上一个示例如何显示任何阻塞:所有方法都返回 react 类型,而没有一个方法正在执行阻塞 I/O。

关于spring-boot - 使用 Project Reactor 中的 ExchangeFunction 从 Client Request 下载并保存文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50091069/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com