gpt4 book ai didi

Scala,使用并发 (akka)、异步 API (nio2) 读取文件、处理行并将输出写入新文件

转载 作者:行者123 更新时间:2023-12-03 18:26:05 24 4
gpt4 key购买 nike

1:我在尝试处理大型文本文件时遇到问题 - 10Gigs+

单线程解决方案如下:

val writer = new PrintWriter(new File(output.getOrElse("output.txt")));
for(line <- scala.io.Source.fromFile(file.getOrElse("data.txt")).getLines())
{
writer.println(DigestUtils.HMAC_SHA_256(line))
}
writer.close()

2:我尝试使用并发处理
val futures = scala.io.Source.fromFile(file.getOrElse("data.txt")).getLines
.map{ s => Future{ DigestUtils.HMAC_SHA_256(s) } }.to
val results = futures.map{ Await.result(_, 10000 seconds) }

这会导致 GC 开销限制超出异常(堆栈跟踪请参见附录 A)

3:我尝试将 Akka IO 与 AsynchronousFileChannel 结合使用,如下 https://github.com/drexin/akka-io-file我能够使用 FileSlurp 以字节块的形式读取文件,但无法找到按行读取文件的解决方案,这是必需的。

任何帮助将不胜感激。谢谢你。

附录 A
[error] (run-main) java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.nio.CharBuffer.wrap(Unknown Source)
at sun.nio.cs.StreamDecoder.implRead(Unknown Source)
at sun.nio.cs.StreamDecoder.read(Unknown Source)
at java.io.InputStreamReader.read(Unknown Source)
at java.io.BufferedReader.fill(Unknown Source)
at java.io.BufferedReader.readLine(Unknown Source)
at java.io.BufferedReader.readLine(Unknown Source)
at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.s
cala:67)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:
48)
at scala.collection.immutable.VectorBuilder.$plus$plus$eq(Vector.scala:7
16)
at scala.collection.immutable.VectorBuilder.$plus$plus$eq(Vector.scala:6
92)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at com.test.Twitterhashconcurrentcli$.doConcurrent(Twitterhashconcu
rrentcli.scala:35)
at com.test.Twitterhashconcurrentcli$delayedInit$body.apply(Twitter
hashconcurrentcli.scala:62)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:
12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.TraversableForwarder$class.foreach(Traversab
leForwarder.scala:32)
at scala.App$class.main(App.scala:71)

最佳答案

这里的技巧是避免可能一次将所有数据读入内存。如果您迭代并向工作人员发送行,则会面临这种风险,因为发送到参与者是异步的,因此您可能会将所有数据读入内存,并且它会位于参与者的邮箱中,这可能会导致 OOM 异常。更好的高级方法是使用单个主actor 和它下面的子worker 池进行处理。这里的技巧是在主文件中使用惰性流(如 Iteratorscala.io.Source.fromX 返回),然后使用 work-pulling工作人员中的模式,以防止他们的邮箱填满数据。然后,当迭代器不再有任何行时,master 停止自身并停止工作程序(如果需要,您也可以使用此点关闭 actor 系统,如果这是您真正想要做的)。

这是一个非常粗略的轮廓。请注意,我还没有对此进行测试:

import akka.actor._
import akka.routing.RoundRobinLike
import akka.routing.RoundRobinRouter
import scala.io.Source
import akka.routing.Broadcast

object FileReadMaster{
case class ProcessFile(filePath:String)
case class ProcessLines(lines:List[String], last:Boolean = false)
case class LinesProcessed(lines:List[String], last:Boolean = false)

case object WorkAvailable
case object GimmeeWork
}

class FileReadMaster extends Actor{
import FileReadMaster._

val workChunkSize = 10
val workersCount = 10

def receive = waitingToProcess

def waitingToProcess:Receive = {
case ProcessFile(path) =>
val workers = (for(i <- 1 to workersCount) yield context.actorOf(Props[FileReadWorker])).toList
val workersPool = context.actorOf(Props.empty.withRouter(RoundRobinRouter(routees = workers)))
val it = Source.fromFile(path).getLines
workersPool ! Broadcast(WorkAvailable)
context.become(processing(it, workersPool, workers.size))

//Setup deathwatch on all
workers foreach (context watch _)
}

def processing(it:Iterator[String], workers:ActorRef, workersRunning:Int):Receive = {
case ProcessFile(path) =>
sender ! Status.Failure(new Exception("already processing!!!"))


case GimmeeWork if it.hasNext =>
val lines = List.fill(workChunkSize){
if (it.hasNext) Some(it.next)
else None
}.flatten

sender ! ProcessLines(lines, it.hasNext)

//If no more lines, broadcast poison pill
if (!it.hasNext) workers ! Broadcast(PoisonPill)

case GimmeeWork =>
//get here if no more work left

case LinesProcessed(lines, last) =>
//Do something with the lines

//Termination for last worker
case Terminated(ref) if workersRunning == 1 =>
//Done with all work, do what you gotta do when done here

//Terminared for non-last worker
case Terminated(ref) =>
context.become(processing(it, workers, workersRunning - 1))

}
}

class FileReadWorker extends Actor{
import FileReadMaster._

def receive = {
case ProcessLines(lines, last) =>
sender ! LinesProcessed(lines.map(_.reverse), last)
sender ! GimmeeWork

case WorkAvailable =>
sender ! GimmeeWork
}
}

这个想法是 master 遍历文件的内容并将工作块发送到子 worker 池。当文件处理开始时,主人告诉所有的 child 工作可用。然后每个 child 继续请求工作,直到没有更多的工作。当 master 检测到文件读完后,它会向 children 广播一个毒丸,让 children 完成任何未完成的工作然后停止。当所有子进程都停止时,master 可以完成任何需要的清理工作。

同样,根据我认为您要问的内容,这是非常粗略的。如果我不在任何领域,请告诉我,我可以修改答案。

关于Scala,使用并发 (akka)、异步 API (nio2) 读取文件、处理行并将输出写入新文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22383939/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com