gpt4 book ai didi

scala - Akka 流 : How to group a list of files in a source by size?

转载 作者:行者123 更新时间:2023-12-01 10:21:56 25 4
gpt4 key购买 nike

所以我目前有一个 akka 流来读取文件列表,还有一个接收器来连接它们,并且工作得很好:

val files = List("a.txt", "b.txt", "c.txt") // and so on;
val source = Source(files).flatMapConcat(f => FileIO.fromPath(Paths.get(f)))

val sink = Sink.fold[ByteString, ByteString](ByteString(""))(_ ++ ByteString("\n" ++ _) // Concatenate

source.toMat(sink)(Keep.right).run().flatMap(concatByteStr => writeByteStrToFile(concatByteStr, "an-output-file.txt"))

虽然这对于一个简单的案例来说很好,但文件相当大(大约 GB,并且无法容纳在我运行此应用程序的机器的内存中。所以我想分 block 在字节字符串达到一定大小后它。一个选项是用 Source.grouped(N) 来做,但是文件的大小变化很大(从 1 KB 到 2 GB),所以不能保证文件的大小正常化。

我的问题是是否有一种方法可以根据字节串​​的大小对写入文件进行分 block 。 akka 流的文档非常多,我在弄清楚图书馆时遇到了麻烦。任何帮助将不胜感激。谢谢!

最佳答案

Akka Streams 的 FileIO 模块为您提供了一个流式 IO Sink 来写入文件,以及实用方法来分 block ByteString。你的例子会变成类似的东西

val files = List("a.txt", "b.txt", "c.txt") // and so on;

val source = Source(files).flatMapConcat(f => FileIO.fromPath(Paths.get(f)))
val chunking = Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)
val sink: Sink[ByteString, Future[IOResult]] = FileIO.toPath(Paths.get("an-output-file.txt"))

source.via(chunking).runWith(sink)

使用 FileIO.toPath 接收器避免将整个折叠的 ByteString 存储到内存中(因此允许正确的流式传输)。

有关此 Akka 模块的更多详细信息,请参阅 docs .

关于scala - Akka 流 : How to group a list of files in a source by size?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48236663/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com