gpt4 book ai didi

scala - 使用背压接收逐行文件 IO

转载 作者:行者123 更新时间:2023-12-04 17:45:20 24 4
gpt4 key购买 nike

我有一个文件处理作业,目前使用带有手动管理背压的 akka actor 来处理处理管道,但我从未能够在输入文件读取阶段成功管理背压。

这个作业接受一个输入文件,并按照每行开头的 ID 号对行进行分组,然后一旦遇到具有新 ID 号的行,它就会通过消息将分组的行推送给处理参与者,然后继续新的 ID 号,一直到它到达文件的末尾。

这似乎是 Akka Streams 的一个很好的用例,使用 File 作为接收器,但我仍然不确定三件事:

1)如何逐行读取文件?

2) 如何按每行上的 ID 分组?我目前为此使用非常命令式的处理,并且我认为我不会在流管道中具有相同的能力。

3)如何应用背压,以便我不会以比下游处理数据的速度更快地将行读入内存?

最佳答案

Akka流'groupBy是一种方法。但是 groupBy 有一个 maxSubstreams param 这将要求您预先知道 ID 的最大范围。所以:下面的解决方案使用 scan识别相同 ID 的块,以及 splitWhen拆分为子流:

object Main extends App {
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()

def extractId(s: String) = {
val a = s.split(",")
a(0) -> a(1)
}

val file = new File("/tmp/example.csv")

private val lineByLineSource = FileIO.fromFile(file)
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
.map(_.utf8String)

val future: Future[Done] = lineByLineSource
.map(extractId)
.scan( (false,"","") )( (l,r) => (l._2 != r._1, r._1, r._2) )
.drop(1)
.splitWhen(_._1)
.fold( ("",Seq[String]()) )( (l,r) => (r._2, l._2 ++ Seq(r._3) ))
.concatSubstreams
.runForeach(println)

private val reply = Await.result(future, 10 seconds)
println(s"Received $reply")
Await.ready(system.terminate(), 10 seconds)
}
extractId将行拆分为 id -> 数据元组。 scan在 id -> 数据元组前面加上一个 start-of-ID-range 标志。 drop将引物元件放到 scan . splitwhen为每个范围开始启动一个新的子流。 fold将子流连接到列表并删除 start-of-ID-range bool 值,以便每个子流生成单个元素。代替折叠,您可能需要自定义 SubFlow它处理单个 ID 的行流并为 ID 范围发出一些结果。 concatSubstreams将 splitWhen 生成的每个 ID 范围的子流合并回由 runForEach 打印的单个流中.

运行:
$ cat /tmp/example.csv
ID1,some input
ID1,some more input
ID1,last of ID1
ID2,one line of ID2
ID3,2nd before eof
ID3,eof

输出是:
(ID1,List(some input, some more input, last of ID1))
(ID2,List(one line of ID2))
(ID3,List(2nd before eof, eof))

关于scala - 使用背压接收逐行文件 IO,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36656817/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com