- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个列表 Source[ByteString, NotUsed]
与来自 S3 存储桶的文件名配对。这些需要在恒定内存中压缩并在 Play 2.6 中提供。
这里有一个有点类似的问题:stream a zip created on the fly with play 2.5 and akka stream with backpressure
在此处使用 Akka Streams 的相关代码片段(Play 2.6+ 需要):
https://gist.github.com/kirked/03c7f111de0e9a1f74377bf95d3f0f60
到目前为止,我的实验基于上述要点,但是,要点解决了一个不同的问题——它通过将图形阶段传递给 InputStream
来从磁盘流式传输文件。 .但是,没有安全的方法来转换我的 Source[ByteString, NotUsed]
成InputStream
,所以我不能按原样使用代码段。
到目前为止,我的实验是将输入类型从 () => InputStream
更改为至 () => Source[ByteString, NotUsed]
然后使用 source.runForeach(...)
消费它.
我的大部分变化都在这里:
override def onPush(): Unit = {
val (filepath, source: StreamGenerator) = grab(in)
buffer.startEntry(filepath)
val src: Source[ByteString, NotUsed] = source()
val operation = src.runForeach(bytestring => {
val byteInputStream = new ByteArrayInputStream(bytestring.toArray)
emitMultiple(out, fileChunks(byteInputStream, buffer))
})
operation.onComplete {
case _ => buffer.endEntry()
}
Await.ready(operation, 5.minute)
}
override def onPush(): Unit = {
val (filepath, source: StreamGenerator) = grab(in)
buffer.startEntry(filepath)
val stream = source().runWith(StreamConverters.asInputStream(1.minute))
currentStream = Some(stream)
emitMultiple(out, fileChunks(stream, buffer), () => buffer.endEntry())
}
[ERROR] [11/27/2017 09:26:38.428] [alpakka-akka.actor.default-dispatcher-3] [akka://alpakka/user/StreamSupervisor-0/flow-0-0-headSink] Error in stage [com.company.productregistration.services.s3.StreamedZip@7f573427]: Reactive stream is terminated, no reads are possible java.io.IOException: Reactive stream is terminated, no reads are possible at akka.stream.impl.io.InputStreamAdapter.subscriberClosedException(InputStreamSinkStage.scala:117) at akka.stream.impl.io.InputStreamAdapter.executeIfNotClosed(InputStreamSinkStage.scala:125) at akka.stream.impl.io.InputStreamAdapter.read(InputStreamSinkStage.scala:144) at com.company.productregistration.services.s3.StreamedZip$$anon$2.result$1(StreamedZip.scala:99) at com.company.productregistration.services.s3.StreamedZip$$anon$2.$anonfun$fileChunks$1(StreamedZip.scala:105) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1169) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159) at scala.collection.immutable.StreamIterator.$anonfun$next$1(Stream.scala:1058) at scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:1047) at scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:1047) at scala.collection.immutable.StreamIterator.hasNext(Stream.scala:1052) at akka.stream.stage.GraphStageLogic$EmittingIterator.onPull(GraphStage.scala:911) at akka.stream.impl.fusing.GraphInterpreter.processPull(GraphInterpreter.scala:506) at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:412) at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:571) at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:541) at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:659) at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:707) at akka.actor.Actor.aroundPreStart(Actor.scala:522) at akka.actor.Actor.aroundPreStart$(Actor.scala:522) at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:650) at akka.actor.ActorCell.create(ActorCell.scala:591) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:484) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) at akka.dispatch.Mailbox.run(Mailbox.scala:223) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
currentStream = Some(stream)
,我没有收到上述错误。此外,它确实适用于某些文件组合。我有一个大约 20 兆字节的较大文件,如果我将其作为最后一个来源,则会损坏我的 zip 文件。如果我将它放在源列表中的任何其他位置,则一切正常。
import java.io.{ByteArrayInputStream, InputStream, OutputStream}
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.util.{ByteString, ByteStringBuilder}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
import scala.util.control.NonFatal
//scalastyle:off
class StreamedZip(bufferSize: Int = 64 * 1024)(implicit ec: ExecutionContext,
mat: ActorMaterializer)
extends GraphStage[FlowShape[StreamedZip.ZipSource, ByteString]] {
import StreamedZip._
val in: Inlet[ZipSource] = Inlet("StreamedZip.in")
val out: Outlet[ByteString] = Outlet("StreamedZip.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with StageLogging {
private val buffer = new ZipBuffer(bufferSize)
private var currentStream: Option[InputStream] = None
setHandler(
out,
new OutHandler {
override def onPull(): Unit =
if (isClosed(in)) {
if (buffer.isEmpty) completeStage()
else {
buffer.close
push(out, buffer.toByteString)
}
} else pull(in)
override def onDownstreamFinish(): Unit = {
closeInput()
buffer.close
super.onDownstreamFinish()
}
}
)
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val (filepath, source: StreamGenerator) = grab(in)
buffer.startEntry(filepath)
val stream = source().runWith(StreamConverters.asInputStream(1.minute))
emitMultiple(out, fileChunks(stream, buffer), () => { buffer.endEntry() })
}
override def onUpstreamFinish(): Unit = {
println("Updstream finish")
closeInput()
if (buffer.isEmpty) completeStage()
else {
buffer.close()
if (isAvailable(out)) {
push(out, buffer.toByteString)
}
}
}
}
)
private def closeInput(): Unit = {
currentStream.foreach(_.close)
currentStream = None
}
private def fileChunks(stream: InputStream, buffer: ZipBuffer): Iterator[ByteString] = {
// This seems like a good trade-off between single-byte
// read I/O performance and doubling the ZipBuffer size.
//
// And it's still a decent defense against DDOS resource
// limit attacks.
val readBuffer = new Array[Byte](1024)
var done = false
def result: Stream[ByteString] =
if (done) Stream.empty
else {
try {
while (!done && buffer.remaining > 0) {
val bytesToRead = Math.min(readBuffer.length, buffer.remaining)
val count = stream.read(readBuffer, 0, bytesToRead)
if (count == -1) {
stream.close
done = true
} else buffer.write(readBuffer, count)
}
buffer.toByteString #:: result
} catch {
case NonFatal(e) =>
closeInput()
throw e
}
}
result.iterator
}
}
}
object StreamedZip {
type ZipFilePath = String
type StreamGenerator = () => Source[ByteString, NotUsed]
type ZipSource = (ZipFilePath, StreamGenerator)
def apply()(implicit ec: ExecutionContext, mat: ActorMaterializer) = new StreamedZip()
}
class ZipBuffer(val bufferSize: Int = 64 * 1024) {
import java.util.zip.{ZipEntry, ZipOutputStream}
private var builder = new ByteStringBuilder()
private val zip = new ZipOutputStream(builder.asOutputStream) {
// this MUST ONLY be used after flush()!
def setOut(newStream: OutputStream): Unit = out = newStream
}
private var inEntry = false
private var closed = false
def close(): Unit = {
endEntry()
closed = true
zip.close()
}
def remaining(): Int = bufferSize - builder.length
def isEmpty(): Boolean = builder.isEmpty
def startEntry(path: String): Unit =
if (!closed) {
endEntry()
zip.putNextEntry(new ZipEntry(path))
inEntry = true
}
def endEntry(): Unit =
if (!closed && inEntry) {
inEntry = false
zip.closeEntry()
}
def write(byte: Int): Unit =
if (!closed && inEntry) zip.write(byte)
def write(bytes: Array[Byte], length: Int): Unit =
if (!closed && inEntry) zip.write(bytes, 0, length)
def toByteString(): ByteString = {
zip.flush()
val result = builder.result
builder = new ByteStringBuilder()
// set the underlying output for the zip stream to be the buffer
// directly, so we don't have to copy the zip'd byte array.
zip.setOut(builder.asOutputStream)
result
}
}
最佳答案
我最终使用了 ZipBuffer
从上面并使用 akka 流 DSL 解决整体问题。
我的解决方案如下:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, SourceShape}
import akka.util.ByteString
import com.company.config.AWS
import org.log4s.getLogger
case class S3StreamingServiceLike(awsConf: AWS, s3Client: S3ClientAlpakka)(
implicit sys: ActorSystem,
mat: ActorMaterializer)
extends S3StreamingService {
private implicit class ConcatSyntax[T, U](source: Source[T, U]) {
def ++[TT >: T, NotUsed](that: Source[SourceShape[TT], NotUsed]): Source[Any, U] = //scalastyle:ignore
source.concat(that)
}
private val logger = getLogger
private sealed trait ZipElement
private case class FileStart(name: String, index: Int, outOf: Int) extends ZipElement
private case class FileEnd(name: String, index: Int, outOf: Int) extends ZipElement
private case class FilePayload(byteString: ByteString) extends ZipElement
private case object EndZip extends ZipElement
private def payloadSource(filename: String) =
s3Client.download(awsConf.s3BucketName, filename).map(FilePayload.apply)
private def fileNameToZipElements(filename: String,
index: Int,
outOf: Int): Source[ZipElement, NotUsed] =
Source.single(FileStart(filename, index, outOf)) ++
payloadSource(filename) ++
Source.single(FileEnd(filename, index, outOf))
def streamFilesAsZip(filenames: List[String])(forUser: String): Source[ByteString, NotUsed] = {
val zipBuffer = new ZipBuffer()
val zipElementSource: Source[ZipElement, NotUsed] =
Source(filenames.zipWithIndex).flatMapConcat {
case (filename, index) => fileNameToZipElements(filename, index + 1, filenames.length)
} ++ Source.single(EndZip)
zipElementSource
.map {
case FileStart(name, index, outOf) =>
logger.info(s"Zipping file #$index of $outOf with name $name for user $forUser")
zipBuffer.startEntry(name)
None
case FilePayload(byteString) =>
if (byteString.length > zipBuffer.remaining()) {
throw new Exception(
s"Bytestring size exceeded buffer size ${byteString.length} > ${zipBuffer.remaining}")
}
zipBuffer.write(byteString.toArray, byteString.length)
Some(zipBuffer.toByteString())
case FileEnd(name, index, outOf) =>
logger.info(s"Finished zipping file #$index of $outOf with $name for user $forUser")
zipBuffer.endEntry()
Some(zipBuffer.toByteString())
case EndZip =>
zipBuffer.close()
Some(zipBuffer.toByteString())
}
.collect {
case Some(bytes) if bytes.length > 0 => bytes
}
}
}
关于scala - 使用 Akka Streams 动态压缩 List[Source[ByteString, NotUsed]],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47506276/
我正在尝试实现具有以下签名的方法: public static Pair, Stream> flatten(Iterator, Stream>> iterator); 该方法的目标是将每种流类型展平
我有两个流从两个不同的 api 获取。 Stream get monthOutStream => monthOutController.stream; Stream get resultOutStre
Stream.of(int[])返回 Stream ,而 Stream.of(String[])返回 Stream . 为什么这两种方法的行为不同?两者都应该返回 Stream和 Stream或 St
我正在使用 rxdart在 dart 中处理流的包。我被困在处理一个特殊的问题上。 请看一下这个虚拟代码: final userId = BehaviorSubject(); Stream getSt
我到处都找遍了,还是没弄明白。我知道你可以用流建立两个关联: 用于支持数据存储的包装器意味着作为消费者和供应商之间的抽象层 数据随着时间的推移变得可用,而不是一次全部 SIMD 代表单指令,多数据;在
考虑下面的代码: List l=new ArrayList<>(); l.add(23);l.add(45);l.add(90); Stream str=l.stream
我有一个大型主干/requirejs 应用程序,我想迁移到 webpack,最新的“webpack”:“^4.27.1”,但我遇到了一个我无法解决的错误。 我一直在阅读 https://webpack
我正在使用 xmpp 开发聊天应用程序,根据我们的要求,我们有三台服务器 Apache Tomcat 7、ejabbered 2.1.11 和 mysql 5.5, to run xmppbot on
我知道如何使用 Java 库,并且我可以编写一些循环来执行我需要的操作,但问题更多,为什么 scala.collection.JavaConverters 中没有任何内容或scala.collecti
我正在尝试创建一个单一的衬里,它应该计算一个非常长的文本文件中的唯一单词。独特的词例如:márya fëdorovna scarlet-liveried,...所以基本上都是非英语词。 我的问题是我的
如果我有以下情况: StreamWriter MySW = null; try { Stream MyStream = new FileStream("asdf.txt"); MySW =
有人可以帮我将以下语句转换为 Java8: 我有一个像这样的 HashMap : private Map, List>> someMap; 我想在java8中转换以下逻辑: private Strin
有人可以帮我将以下语句转换为 Java8: 我有一个像这样的 HashMap : private Map, List>> someMap; 我想在java8中转换以下逻辑: private Strin
考虑两种测试方法parallel()和sequential(): @Test public void parallel() throws Exception { System.ou
我是 NodeJS 的新手,我基本上想做的是通过 HTTP 将 .pdf 上传到我的服务器。我正在使用 POST rquest 来处理 Content-Type multipart/form-data
哪个更好:MemoryStream.WriteTo(Stream destinationStream) 或 Stream.CopyTo(Stream destinationStream)?? 我正在谈
给定一个 Stream,我想创建一个新的 Stream,其中的元素在它们之间有时间延迟。 我尝试使用 tokio_core::reactor::Timeout 和 Stream 的 and_then
我是 Kafka Streams 和 Spring Cloud Stream 的新手,但在将集成相关代码移动到属性文件方面已经阅读了有关它的好东西,因此开发人员可以主要专注于事物的业务逻辑方面。 这里
源代码看起来非常相似:pump , pipe .为什么我要使用一个而不是另一个?一个只是另一个的更好版本吗? 最佳答案 Stream.pipe 现在显然是自 0.3.x 以来的首选方法,因此尽可能尝试
我正在寻找是否有更好的方法来解决我不得不使用这些签名的困境(注意:由于 Spock 测试,T[][] 是必需的,我提供 T[][] 作为数据提供商) 我的方法签名是: public T[][] cr
我是一名优秀的程序员,十分优秀!