gpt4 book ai didi

spring-boot - 在 Spring Webflux 中如何从 `OutputStream` 到 `Flux`?

转载 作者:行者123 更新时间:2023-12-05 06:29:57 27 4
gpt4 key购买 nike

我正在动态构建一个 tarball,并希望将其直接流式传输回来,使用 .tar.gz 应该是 100% 可能的。

通过大量谷歌搜索,下面的代码是我能得到的最接近 dataBuffer 的代码。基本上,我需要一些实现 OutputStream 的东西并提供或发布到 Flux<DataBuffer>这样我就可以从我的方法中返回它,并有流输出,而不是在 ram 中缓冲整个 tarball(我很确定这就是这里发生的事情)。我正在使用 apache Compress-commons,它有一个很棒的 AP​​I,但它都是基于 OutputStream 的。

我想另一种方法是直接写入响应,但我认为这不是正确的 react 方式?不确定如何获得 OutputStream来自某种 Response 对象。

顺便说一句,这是在 Spring Boot 2.0 上的 kotlin

@GetMapping("/cookbook.tar.gz", "/cookbook")
fun getCookbook(): Mono<DefaultDataBuffer> {
log.info("Creating tarball of cookbooks: ${soloConfig.cookbookPaths}")

val transformation = Mono.just(soloConfig.cookbookPaths.stream()
.toList()
.flatMap {
Files.walk(Paths.get(it)).map(Path::toFile).toList()
})
.map { files ->

//Will make one giant databuffer... but oh well? TODO: maybe use some kind of chunking.
val buffer = DefaultDataBufferFactory().allocateBuffer()
val outputBufferStream = buffer.asOutputStream()


//Transform my list of stuff into an archiveOutputStream
TarArchiveOutputStream(GzipCompressorOutputStream(outputBufferStream)).use { taos ->
taos.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU)

log.info("files to compress: ${files}")

for (file in files) {
if (file.isFile) {
val entry = "cookbooks/" + file.name
log.info("Adding ${entry} to tarball")
taos.putArchiveEntry(TarArchiveEntry(file, entry))
FileInputStream(file).use { fis ->
fis.copyTo(taos) //Copy that stuff!
}
taos.closeArchiveEntry()
}
}
}
buffer
}

return transformation
}

最佳答案

我很困惑,并且有一个有效的解决方案。您实现一个 OutputStream 并获取这些字节并将它们发布到一个流中。一定要覆盖关闭,并发送一个 onComplete。效果很好!

@RestController
class SoloController(
val soloConfig: SoloConfig
) {
val log = KotlinLogging.logger { }

@GetMapping("/cookbooks.tar.gz", "/cookbooks")
fun streamCookbook(serverHttpResponse: ServerHttpResponse): Flux<DataBuffer> {
log.info("Creating tarball of cookbooks: ${soloConfig.cookbookPaths}")

val publishingOutputStream = PublishingOutputStream(serverHttpResponse.bufferFactory())

//Needs to set up cookbook path as a parent directory, and then do `cookbooks/$cookbook_path/<all files>` for each cookbook path given
Flux.just(soloConfig.cookbookPaths.stream().toList())
.doOnNext { paths ->
//Transform my list of stuff into an archiveOutputStream
TarArchiveOutputStream(GzipCompressorOutputStream(publishingOutputStream)).use { taos ->
taos.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU)

paths.forEach { cookbookDir ->
if (Paths.get(cookbookDir).toFile().isDirectory) {

val cookbookDirFile = Paths.get(cookbookDir).toFile()
val directoryName = cookbookDirFile.name
val entryStart = "cookbooks/${directoryName}"

val files = Files.walk(cookbookDirFile.toPath()).map(Path::toFile).toList()

log.info("${files.size} files to compress")

for (file in files) {
if (file.isFile) {
val relativePath = file.toRelativeString(cookbookDirFile)
val entry = "$entryStart/$relativePath"
taos.putArchiveEntry(TarArchiveEntry(file, entry))
FileInputStream(file).use { fis ->
fis.copyTo(taos) //Copy that stuff!
}
taos.closeArchiveEntry()
}
}
}
}
}
}
.subscribeOn(Schedulers.parallel())
.doOnComplete {
publishingOutputStream.close()
}
.subscribe()

return publishingOutputStream.publisher
}

class PublishingOutputStream(bufferFactory: DataBufferFactory) : OutputStream() {

val publisher: UnicastProcessor<DataBuffer> = UnicastProcessor.create(Queues.unbounded<DataBuffer>().get())
private val bufferPublisher: UnicastProcessor<Byte> = UnicastProcessor.create(Queues.unbounded<Byte>().get())

init {
bufferPublisher
.bufferTimeout(4096, Duration.ofMillis(100))
.doOnNext { intList ->
val buffer = bufferFactory.allocateBuffer(intList.size)
buffer.write(intList.toByteArray())
publisher.onNext(buffer)
}
.doOnComplete {
publisher.onComplete()
}
.subscribeOn(Schedulers.newSingle("publisherThread"))
.subscribe()
}

override fun write(b: Int) {
bufferPublisher.onNext(b.toByte())
}

override fun close() {
bufferPublisher.onComplete() //which should trigger the clean up of the whole thing
}
}
}

关于spring-boot - 在 Spring Webflux 中如何从 `OutputStream` 到 `Flux<DataBuffer>`?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52882074/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com