gpt4 book ai didi

scala - 使用 Akka Streams 动态压缩 List[Source[ByteString, NotUsed]]

转载 作者:行者123 更新时间:2023-12-02 01:06:00 25 4
gpt4 key购买 nike

我有一个列表 Source[ByteString, NotUsed]与来自 S3 存储桶的文件名配对。这些需要在恒定内存中压缩并在 Play 2.6 中提供。

这里有一个有点类似的问题:stream a zip created on the fly with play 2.5 and akka stream with backpressure

在此处使用 Akka Streams 的相关代码片段(Play 2.6+ 需要):

https://gist.github.com/kirked/03c7f111de0e9a1f74377bf95d3f0f60

到目前为止,我的实验基于上述要点,但是,要点解决了一个不同的问题——它通过将图形阶段传递给 InputStream 来从磁盘流式传输文件。 .但是,没有安全的方法来转换我的 Source[ByteString, NotUsed]InputStream ,所以我不能按原样使用代码段。

到目前为止,我的实验是将输入类型从 () => InputStream 更改为至 () => Source[ByteString, NotUsed]然后使用 source.runForeach(...) 消费它.

我的大部分变化都在这里:

override def onPush(): Unit = {
val (filepath, source: StreamGenerator) = grab(in)
buffer.startEntry(filepath)
val src: Source[ByteString, NotUsed] = source()
val operation = src.runForeach(bytestring => {
val byteInputStream = new ByteArrayInputStream(bytestring.toArray)
emitMultiple(out, fileChunks(byteInputStream, buffer))
})
operation.onComplete {
case _ => buffer.endEntry()
}
Await.ready(operation, 5.minute)
}

我知道这是阻塞的,但我不确定在这种情况下是否允许。

我如何以安全的方式完成此操作?

编辑

我还尝试了这个更接近要点的版本:
override def onPush(): Unit = {
val (filepath, source: StreamGenerator) = grab(in)
buffer.startEntry(filepath)
val stream = source().runWith(StreamConverters.asInputStream(1.minute))
currentStream = Some(stream)
emitMultiple(out, fileChunks(stream, buffer), () => buffer.endEntry())
}

但是,此堆栈跟踪会产生错误:

[ERROR] [11/27/2017 09:26:38.428] [alpakka-akka.actor.default-dispatcher-3] [akka://alpakka/user/StreamSupervisor-0/flow-0-0-headSink] Error in stage [com.company.productregistration.services.s3.StreamedZip@7f573427]: Reactive stream is terminated, no reads are possible java.io.IOException: Reactive stream is terminated, no reads are possible at akka.stream.impl.io.InputStreamAdapter.subscriberClosedException(InputStreamSinkStage.scala:117) at akka.stream.impl.io.InputStreamAdapter.executeIfNotClosed(InputStreamSinkStage.scala:125) at akka.stream.impl.io.InputStreamAdapter.read(InputStreamSinkStage.scala:144) at com.company.productregistration.services.s3.StreamedZip$$anon$2.result$1(StreamedZip.scala:99) at com.company.productregistration.services.s3.StreamedZip$$anon$2.$anonfun$fileChunks$1(StreamedZip.scala:105) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1169) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159) at scala.collection.immutable.StreamIterator.$anonfun$next$1(Stream.scala:1058) at scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:1047) at scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:1047) at scala.collection.immutable.StreamIterator.hasNext(Stream.scala:1052) at akka.stream.stage.GraphStageLogic$EmittingIterator.onPull(GraphStage.scala:911) at akka.stream.impl.fusing.GraphInterpreter.processPull(GraphInterpreter.scala:506) at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:412) at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:571) at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:541) at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:659) at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:707) at akka.actor.Actor.aroundPreStart(Actor.scala:522) at akka.actor.Actor.aroundPreStart$(Actor.scala:522) at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:650) at akka.actor.ActorCell.create(ActorCell.scala:591) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:484) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) at akka.dispatch.Mailbox.run(Mailbox.scala:223) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



编辑2
如果我不设置 currentStream = Some(stream) ,我没有收到上述错误。此外,它确实适用于某些文件组合。我有一个大约 20 兆字节的较大文件,如果我将其作为最后一个来源,则会损坏我的 zip 文件。如果我将它放在源列表中的任何其他位置,则一切正常。

以下是我当前图形阶段实现的完整列表:
import java.io.{ByteArrayInputStream, InputStream, OutputStream}

import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.util.{ByteString, ByteStringBuilder}

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
import scala.util.control.NonFatal

//scalastyle:off
class StreamedZip(bufferSize: Int = 64 * 1024)(implicit ec: ExecutionContext,
mat: ActorMaterializer)
extends GraphStage[FlowShape[StreamedZip.ZipSource, ByteString]] {

import StreamedZip._

val in: Inlet[ZipSource] = Inlet("StreamedZip.in")
val out: Outlet[ByteString] = Outlet("StreamedZip.out")
override val shape = FlowShape.of(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with StageLogging {
private val buffer = new ZipBuffer(bufferSize)
private var currentStream: Option[InputStream] = None

setHandler(
out,
new OutHandler {
override def onPull(): Unit =
if (isClosed(in)) {
if (buffer.isEmpty) completeStage()
else {
buffer.close
push(out, buffer.toByteString)
}
} else pull(in)

override def onDownstreamFinish(): Unit = {
closeInput()
buffer.close
super.onDownstreamFinish()
}
}
)

setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val (filepath, source: StreamGenerator) = grab(in)
buffer.startEntry(filepath)
val stream = source().runWith(StreamConverters.asInputStream(1.minute))
emitMultiple(out, fileChunks(stream, buffer), () => { buffer.endEntry() })
}

override def onUpstreamFinish(): Unit = {
println("Updstream finish")
closeInput()
if (buffer.isEmpty) completeStage()
else {
buffer.close()
if (isAvailable(out)) {
push(out, buffer.toByteString)
}
}
}
}
)

private def closeInput(): Unit = {
currentStream.foreach(_.close)
currentStream = None
}

private def fileChunks(stream: InputStream, buffer: ZipBuffer): Iterator[ByteString] = {
// This seems like a good trade-off between single-byte
// read I/O performance and doubling the ZipBuffer size.
//
// And it's still a decent defense against DDOS resource
// limit attacks.
val readBuffer = new Array[Byte](1024)
var done = false

def result: Stream[ByteString] =
if (done) Stream.empty
else {
try {
while (!done && buffer.remaining > 0) {
val bytesToRead = Math.min(readBuffer.length, buffer.remaining)
val count = stream.read(readBuffer, 0, bytesToRead)
if (count == -1) {
stream.close
done = true
} else buffer.write(readBuffer, count)
}
buffer.toByteString #:: result
} catch {
case NonFatal(e) =>
closeInput()
throw e
}
}

result.iterator
}
}
}

object StreamedZip {
type ZipFilePath = String
type StreamGenerator = () => Source[ByteString, NotUsed]
type ZipSource = (ZipFilePath, StreamGenerator)

def apply()(implicit ec: ExecutionContext, mat: ActorMaterializer) = new StreamedZip()

}

class ZipBuffer(val bufferSize: Int = 64 * 1024) {
import java.util.zip.{ZipEntry, ZipOutputStream}

private var builder = new ByteStringBuilder()
private val zip = new ZipOutputStream(builder.asOutputStream) {
// this MUST ONLY be used after flush()!
def setOut(newStream: OutputStream): Unit = out = newStream
}
private var inEntry = false
private var closed = false

def close(): Unit = {
endEntry()
closed = true
zip.close()
}

def remaining(): Int = bufferSize - builder.length

def isEmpty(): Boolean = builder.isEmpty

def startEntry(path: String): Unit =
if (!closed) {
endEntry()
zip.putNextEntry(new ZipEntry(path))
inEntry = true
}

def endEntry(): Unit =
if (!closed && inEntry) {
inEntry = false
zip.closeEntry()
}

def write(byte: Int): Unit =
if (!closed && inEntry) zip.write(byte)

def write(bytes: Array[Byte], length: Int): Unit =
if (!closed && inEntry) zip.write(bytes, 0, length)

def toByteString(): ByteString = {
zip.flush()
val result = builder.result
builder = new ByteStringBuilder()
// set the underlying output for the zip stream to be the buffer
// directly, so we don't have to copy the zip'd byte array.
zip.setOut(builder.asOutputStream)
result
}
}

最佳答案

我最终使用了 ZipBuffer从上面并使用 akka 流 DSL 解决整体问题。

我的解决方案如下:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, SourceShape}
import akka.util.ByteString
import com.company.config.AWS
import org.log4s.getLogger

case class S3StreamingServiceLike(awsConf: AWS, s3Client: S3ClientAlpakka)(
implicit sys: ActorSystem,
mat: ActorMaterializer)
extends S3StreamingService {

private implicit class ConcatSyntax[T, U](source: Source[T, U]) {
def ++[TT >: T, NotUsed](that: Source[SourceShape[TT], NotUsed]): Source[Any, U] = //scalastyle:ignore
source.concat(that)
}

private val logger = getLogger

private sealed trait ZipElement
private case class FileStart(name: String, index: Int, outOf: Int) extends ZipElement
private case class FileEnd(name: String, index: Int, outOf: Int) extends ZipElement
private case class FilePayload(byteString: ByteString) extends ZipElement
private case object EndZip extends ZipElement

private def payloadSource(filename: String) =
s3Client.download(awsConf.s3BucketName, filename).map(FilePayload.apply)

private def fileNameToZipElements(filename: String,
index: Int,
outOf: Int): Source[ZipElement, NotUsed] =
Source.single(FileStart(filename, index, outOf)) ++
payloadSource(filename) ++
Source.single(FileEnd(filename, index, outOf))

def streamFilesAsZip(filenames: List[String])(forUser: String): Source[ByteString, NotUsed] = {

val zipBuffer = new ZipBuffer()

val zipElementSource: Source[ZipElement, NotUsed] =
Source(filenames.zipWithIndex).flatMapConcat {
case (filename, index) => fileNameToZipElements(filename, index + 1, filenames.length)
} ++ Source.single(EndZip)

zipElementSource
.map {
case FileStart(name, index, outOf) =>
logger.info(s"Zipping file #$index of $outOf with name $name for user $forUser")
zipBuffer.startEntry(name)
None
case FilePayload(byteString) =>
if (byteString.length > zipBuffer.remaining()) {
throw new Exception(
s"Bytestring size exceeded buffer size ${byteString.length} > ${zipBuffer.remaining}")
}
zipBuffer.write(byteString.toArray, byteString.length)
Some(zipBuffer.toByteString())
case FileEnd(name, index, outOf) =>
logger.info(s"Finished zipping file #$index of $outOf with $name for user $forUser")
zipBuffer.endEntry()
Some(zipBuffer.toByteString())
case EndZip =>
zipBuffer.close()
Some(zipBuffer.toByteString())
}
.collect {
case Some(bytes) if bytes.length > 0 => bytes
}
}

}

关于scala - 使用 Akka Streams 动态压缩 List[Source[ByteString, NotUsed]],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47506276/

25 4 0