作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试将传入的 Akka 字节流(来自 http 请求的正文,但也可能来自文件)拆分为多个定义大小的文件。
例如,如果我上传一个 10Gb 的文件,它会创建类似于 10 个 1Gb 的文件。这些文件将具有随机生成的名称。我的问题是我真的不知道从哪里开始,因为我读过的所有响应和示例要么将整个块存储到内存中,要么使用基于字符串的分隔符。除了我不能真正拥有 1Gb 的“块”,然后将它们写入磁盘..
有没有简单的方法来执行这种操作?我唯一的想法是使用这样的东西 http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Chunking_up_a_stream_of_ByteStrings_into_limited_size_ByteStrings但转化为类似 FlowShape[ByteString, File]
,将自己写入文件中的块,直到达到最大文件大小,然后创建一个新文件,等等,并流回创建的文件。这看起来是一个糟糕的想法,没有正确使用 Akka ..
提前致谢
最佳答案
对于此类问题,我经常使用纯函数式、非 akka 技术,然后将这些函数“提升”为 akka 结构。我的意思是我尝试只使用 scala“东西”,然后尝试将这些东西包装在 akka 中......
文件创建
从 FileOutputStream
开始基于“随机生成的名称”的创建:
def randomFileNameGenerator : String = ??? //not specified in question
import java.io.FileOutputStream
val randomFileOutGenerator : () => FileOutputStream =
() => new FileOutputStream(randomFileNameGenerator)
case class FileState(byteCount : Int = 0,
fileOut : FileOutputStream = randomFileOutGenerator())
ByteString
违反最大文件大小阈值。 :
import akka.util.ByteString
val isEndOfChunk : (FileState, ByteString, Int) => Boolean =
(state, byteString, maxBytes) =>
state.byteCount + byteString.length > maxBytes
FileState
的函数。如果我们已经最大化了当前的容量,或者如果它仍然低于容量,则返回当前状态:
val closeFileInState : FileState => Unit =
(_ : FileState).fileOut.close()
val getCurrentFileState(FileState, ByteString, Int) => FileState =
(state, byteString, maxBytes) =>
if(isEndOfChunk(maxBytes, state, byteString)) {
closeFileInState(state)
FileState()
}
else
state
FileOutputStream
:
val writeToFileAndReturn(FileState, ByteString) => FileState =
(fileState, byteString) => {
fileState.fileOut write byteString.toArray
fileState copy (byteCount = fileState.byteCount + byteString.size)
}
//the signature ordering will become useful
def writeToChunkedFile(maxBytes : Int)(fileState : FileState, byteString : ByteString) : FileState =
writeToFileAndReturn(getCurrentFileState(maxBytes, fileState, byteString), byteString)
GenTraversableOnce
是具有折叠运算符的任何集合,无论是否并行。这些包括迭代器、向量、数组、序列、scala 流、......最终
writeToChunkedFile
函数完美匹配
GenTraversableOnce#fold的签名:
val anyIterable : Iterable = ???
val finalFileState = anyIterable.fold(FileState())(writetochunkedFile(maxBytes))
FileOutputStream
也需要关闭。由于折叠只会发出最后一个
FileState
我们可以关闭那个:
closeFileInState(finalFileState)
fold
来自
FlowOps#fold这恰好与
GenTraversableOnce
匹配签名。因此,我们可以将常规函数“提升”为类似于我们使用
Iterable
的方式的流值。折叠:
import akka.stream.scaladsl.Flow
def chunkerFlow(maxBytes : Int) : Flow[ByteString, FileState, _] =
Flow[ByteString].fold(FileState())(writeToChunkedFile(maxBytes))
ActorSystem
在单元测试中,只是常规语言数据结构。
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
def byteStringSink(maxBytes : Int) : Sink[ByteString, _] =
chunkerFlow(maxBytes) to (Sink foreach closeFileInState)
Sink
排水
HttpEntity
来自
HttpRequest
.
关于scala - Akka 流 - 将 ByteString 流拆分为多个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41402495/
我是一名优秀的程序员,十分优秀!