- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在动态构建一个 tarball,并希望将其直接流式传输回来,使用 .tar.gz 应该是 100% 可能的。
通过大量谷歌搜索,下面的代码是我能得到的最接近 dataBuffer 的代码。基本上,我需要一些实现 OutputStream
的东西并提供或发布到 Flux<DataBuffer>
这样我就可以从我的方法中返回它,并有流输出,而不是在 ram 中缓冲整个 tarball(我很确定这就是这里发生的事情)。我正在使用 apache Compress-commons,它有一个很棒的 API,但它都是基于 OutputStream 的。
我想另一种方法是直接写入响应,但我认为这不是正确的 react 方式?不确定如何获得 OutputStream
来自某种 Response 对象。
顺便说一句,这是在 Spring Boot 2.0 上的 kotlin
@GetMapping("/cookbook.tar.gz", "/cookbook")
fun getCookbook(): Mono<DefaultDataBuffer> {
log.info("Creating tarball of cookbooks: ${soloConfig.cookbookPaths}")
val transformation = Mono.just(soloConfig.cookbookPaths.stream()
.toList()
.flatMap {
Files.walk(Paths.get(it)).map(Path::toFile).toList()
})
.map { files ->
//Will make one giant databuffer... but oh well? TODO: maybe use some kind of chunking.
val buffer = DefaultDataBufferFactory().allocateBuffer()
val outputBufferStream = buffer.asOutputStream()
//Transform my list of stuff into an archiveOutputStream
TarArchiveOutputStream(GzipCompressorOutputStream(outputBufferStream)).use { taos ->
taos.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU)
log.info("files to compress: ${files}")
for (file in files) {
if (file.isFile) {
val entry = "cookbooks/" + file.name
log.info("Adding ${entry} to tarball")
taos.putArchiveEntry(TarArchiveEntry(file, entry))
FileInputStream(file).use { fis ->
fis.copyTo(taos) //Copy that stuff!
}
taos.closeArchiveEntry()
}
}
}
buffer
}
return transformation
}
最佳答案
我很困惑,并且有一个有效的解决方案。您实现一个 OutputStream
并获取这些字节并将它们发布到一个流中。一定要覆盖关闭,并发送一个 onComplete。效果很好!
@RestController
class SoloController(
val soloConfig: SoloConfig
) {
val log = KotlinLogging.logger { }
@GetMapping("/cookbooks.tar.gz", "/cookbooks")
fun streamCookbook(serverHttpResponse: ServerHttpResponse): Flux<DataBuffer> {
log.info("Creating tarball of cookbooks: ${soloConfig.cookbookPaths}")
val publishingOutputStream = PublishingOutputStream(serverHttpResponse.bufferFactory())
//Needs to set up cookbook path as a parent directory, and then do `cookbooks/$cookbook_path/<all files>` for each cookbook path given
Flux.just(soloConfig.cookbookPaths.stream().toList())
.doOnNext { paths ->
//Transform my list of stuff into an archiveOutputStream
TarArchiveOutputStream(GzipCompressorOutputStream(publishingOutputStream)).use { taos ->
taos.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU)
paths.forEach { cookbookDir ->
if (Paths.get(cookbookDir).toFile().isDirectory) {
val cookbookDirFile = Paths.get(cookbookDir).toFile()
val directoryName = cookbookDirFile.name
val entryStart = "cookbooks/${directoryName}"
val files = Files.walk(cookbookDirFile.toPath()).map(Path::toFile).toList()
log.info("${files.size} files to compress")
for (file in files) {
if (file.isFile) {
val relativePath = file.toRelativeString(cookbookDirFile)
val entry = "$entryStart/$relativePath"
taos.putArchiveEntry(TarArchiveEntry(file, entry))
FileInputStream(file).use { fis ->
fis.copyTo(taos) //Copy that stuff!
}
taos.closeArchiveEntry()
}
}
}
}
}
}
.subscribeOn(Schedulers.parallel())
.doOnComplete {
publishingOutputStream.close()
}
.subscribe()
return publishingOutputStream.publisher
}
class PublishingOutputStream(bufferFactory: DataBufferFactory) : OutputStream() {
val publisher: UnicastProcessor<DataBuffer> = UnicastProcessor.create(Queues.unbounded<DataBuffer>().get())
private val bufferPublisher: UnicastProcessor<Byte> = UnicastProcessor.create(Queues.unbounded<Byte>().get())
init {
bufferPublisher
.bufferTimeout(4096, Duration.ofMillis(100))
.doOnNext { intList ->
val buffer = bufferFactory.allocateBuffer(intList.size)
buffer.write(intList.toByteArray())
publisher.onNext(buffer)
}
.doOnComplete {
publisher.onComplete()
}
.subscribeOn(Schedulers.newSingle("publisherThread"))
.subscribe()
}
override fun write(b: Int) {
bufferPublisher.onNext(b.toByte())
}
override fun close() {
bufferPublisher.onComplete() //which should trigger the clean up of the whole thing
}
}
}
关于spring-boot - 在 Spring Webflux 中如何从 `OutputStream` 到 `Flux<DataBuffer>`?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52882074/
我希望缓存Mono(仅在成功的情况下),这是WebClient调用的结果。 通过阅读项目 react 堆插件文档,我觉得CacheMono不太适合,因为它也存储了我不想要的错误。 因此,我没有使用Ca
我用 webflux 与 网易和 jdbc ,所以我以下一种方式包装阻塞 jdbc 操作: static Mono fromOne(Callable blockingOperation) {
有人可以告诉我或使用 提供现成的 CRUD 示例吗? WebFlux、RScoket 和 Spring(或 SpringBoot) ? 我研究了 RSocket 文档, WebFlux ,也写了我的简
我正在通过代理连接使用ssl服务测试webclient,但是使用安全ssl连接时出现以下错误。 你知道是什么问题吗? 堆栈跟踪: {"timestamp":"2019-10-29T18:35:43.5
public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { Flux body = exchange.
我创建了一个过滤器,我可以在其中访问有效负载的主体并对其执行一些逻辑(现在假设我记录主体)。在最后一步中,我返回了 Mono,但是当请求通过 Controller 继续发送到服务时,它会抛出请求正文丢
上次我在考虑在我们的应用程序中正确使用记录器。例如,我有一个返回用户流的 Controller ,但在日志中,我看到“获取用户”日志正在被另一个线程而不是处理管道上的线程记录,但这是一个好方法吗? @
我正在使用带有 Netty 的 Spring Webflux (2.0.3.RELEASE) 并尝试了解服务器和 Web 客户端如何使用线程。我用 WebClient 编写了一些带有 http 调用链
我面临一个问题。我正在使用 Spring Webflux 并行调用一些 API。如果任何子线程面临任何问题,它需要记录请求。现在的问题是,用于记录一个普通的 POJO 类,其中有一个静态方法通过 Ap
我试图用 JSP 配置 Spring WebFlux。我在 Spring WebFlux 中没有看到任何支持 JSTL View 的 View 类。这是否意味着我们不能使用 Spring WebFlu
我正在寻找一种在响应式(Reactive) API 中使用计划任务的方法。 我知道它使用线程池,所以它与 webflux 组件不太兼容。 你有同等的人来做这项工作吗? 非常感谢 萨维留 最佳答案 有几
我曾经调用 HttpServletRequest.getRemoteAddr() 来获取客户端 ip。 我想知道如何通过 ServerWebExchange 获得它。 我最好的猜测是: serve
我想使用 spring webflux 以 react 方式流式传输文件。 我的端点应该看起来更具体什么是对象的类型? @GetMapping("/file") Flux file() { /
我无法让我的响应式(Reactive)代码以一种常见的方式处理错误。理想的方式是使用可重用的组件,我可以将其作为依赖项添加到其他项目中。 过去,我们使用 @RestControllerAdvise 通
我们正在尝试对 Webflux 使用react。我们将 Jaegar 与 Istio 用于检测目的。 Jaegar 非常了解 Spring MVC 端点,但似乎对 WebFlux 根本不起作用。 我正
我是响应式(Reactive)编程和 Spring WebFlux 的新手。我想让我的 App 1 通过 Flux 发布 Server Sent 事件,我的 App 2 持续监听它。 我希望 Flux
我正在我的项目中尝试新的 ReactiveQuerydslPredicateExecutor 但我找不到 findAll(Predicate, Pageable) 的方法,就像我在 QueryDslP
我正在使用 Spring WebFlux webclient 进行 REST 调用。我已经在 3000 上配置了连接超时毫秒,相应地: WebClient webClient = WebClient.
我想测量使用 WebFlux 进行的一些异步调用的长度。我一直在阅读各种来源,据我所知,@Timed 注释与 AspectJ 一起工作,基本上只是在方法调用之前启动一个计时器,然后停止它。这显然不适用
我有一个 Reactor Kafka 应用程序,它无限期地使用来自主题的消息。我需要公开一个健康检查 REST 端点,它可以指示此过程的健康状况——主要是想知道 Kafka 接收器通量序列是否已终止,
我是一名优秀的程序员,十分优秀!