gpt4 book ai didi

java - 如何正确读取 Flux 并将其转换为单个 inputStream

转载 作者:IT老高 更新时间:2023-10-28 13:50:37 42 4
gpt4 key购买 nike

我正在为我的 spring-boot 应用程序使用 WebClient 和自定义 BodyExtractor

WebClient webLCient = WebClient.create();
webClient.get()
.uri(url, params)
.accept(MediaType.APPLICATION.XML)
.exchange()
.flatMap(response -> {
return response.body(new BodyExtractor());
})

BodyExtractor.java

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
Flux<DataBuffer> body = response.getBody();
body.map(dataBuffer -> {
try {
JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
Unmarshaller unmarshaller = jc.createUnmarshaller();

return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
} catch(Exception e){
return null;
}
}).next();
}

上面的代码适用于小负载,但不适用于大负载,我认为这是因为我只用 next 读取单个通量值,我不知道如何组合和读取所有dataBuffer.

我是 reactor 的新手,所以我不知道很多有关通量/单声道的技巧。

最佳答案

这确实不像其他答案所暗示的那么复杂。

正如@jin-kwon 所建议的那样,流式传输数据而不将其全部缓冲在内存中的唯一方法是使用管道。但是,使用 Spring 的 BodyExtractors 可以非常简单地完成。和 DataBufferUtils实用程序类。

例子:

private InputStream readAsInputStream(String url) throws IOException {
PipedOutputStream osPipe = new PipedOutputStream();
PipedInputStream isPipe = new PipedInputStream(osPipe);

ClientResponse response = webClient.get().uri(url)
.accept(MediaType.APPLICATION.XML)
.exchange()
.block();
final int statusCode = response.rawStatusCode();
// check HTTP status code, can throw exception if needed
// ....

Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers())
.doOnError(t -> {
log.error("Error reading body.", t);
// close pipe to force InputStream to error,
// otherwise the returned InputStream will hang forever if an error occurs
try(isPipe) {
//no-op
} catch (IOException ioe) {
log.error("Error closing streams", ioe);
}
})
.doFinally(s -> {
try(osPipe) {
//no-op
} catch (IOException ioe) {
log.error("Error closing streams", ioe);
}
});

DataBufferUtils.write(body, osPipe)
.subscribe(DataBufferUtils.releaseConsumer());

return isPipe;
}

如果您不关心检查响应代码或为失败状态代码抛出异常,您可以跳过 block() 调用和中间 ClientResponse 变量通过使用

flatMap(r -> r.body(BodyExtractors.toDataBuffers()))

改为。

关于java - 如何正确读取 Flux<DataBuffer> 并将其转换为单个 inputStream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46460599/

42 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com