gpt4 book ai didi

scala - Scala系统进程挂起

转载 作者:行者123 更新时间:2023-12-01 12:43:52 24 4
gpt4 key购买 nike

我有一个使用ProcessBuilder执行外部流程的演员:

  def act {
while (true) {
receive {
case param: String => {
val filePaths = Seq("/tmp/file1","/tmp/file2")
val fileList = new ByteArrayInputStream(filePaths.mkString("\n").getBytes())
val output = s"myExecutable.sh ${param}" #< fileList !!<

doSomethingWith(output)
}
}
}
}

我让数百名演员并行运行。有时,由于未知原因,过程(!!)的执行永不返回。它永远挂着。该特定参与者无法处理新消息。有什么方法可以设置超时以使该过程返回,如果超时超过重试次数?

这些处决永远被搁置的原因可能是什么?因为这些命令的持续时间不应超过几毫秒。

编辑1:
我观察到的两个重要事实:
  • 仅在Linux上,在Max OS X上不会发生此问题
  • 当我不使用ByteArrayInputStream作为执行的输入时,程序不会挂起
  • 最佳答案

    我有一个使用ProcessBuilder来执行外部过程的角色:...我运行数百个并行运行的角色...

    这是并行进行的非常繁重的处理,只是在每种情况下要完成几毫米的工作。并发处理机制的等级如下(在资源使用,可伸缩性和性能方面从最差到最佳):

  • 进程=重量级
  • thread =中等重量(可以在单个进程空间中执行数十个线程)
  • actor =轻量级(数十个actor可以通过利用单个共享线程或多个共享线程来执行)

  • 同时产生许多进程会占用大量操作系统资源-用于进程的创建和终止。在极端情况下,开始和结束进程的O / S开销可能比实际作业执行消耗数百或数千个CPU和内存资源。这就是创建线程模型(以及更有效的参与者模型)的原因。认为您当前的处理是在极其可扩展的参与者中进行“类似CGI的”不可扩展的O / S压力处理-这是一种反模式。不需要花太多精力将某些操作系统压到破损的程度:这可能正在发生。

    另外,如果正在读取的文件非常大,则最好从可伸缩性和可靠性上限制同时读取同一磁盘上文件的进程数。最多可以同时读取10个进程,我怀疑100个进程是否可以。

    Actor应该如何调用外部程序?

    当然,如果将myExecutable.sh中的逻辑转换为Scala,则完全不需要创建进程。实现可伸缩性,性能和可靠性将更加简单。

    假设这是不可能/不希望的,则应限制创建的进程总数,并应随时间在不同的Actor /请求之间重用它们。

    第一个解决方案选项:(1)创建一个可重用的进程池(例如,大小为10)(2)创建参与者(例如100),它们通过ProcessIO与该进程进行通信(3),如果所有进程都在忙于处理,则可以/适当地阻止Actor,直到一个可用为止。这个选项的问题是:复杂性; 100个参与者必须完成工作才能与流程池进行交互,并且当流程成为瓶颈时,参与者本身几乎没有增加任何价值。

    更好的解决方案选项:(1)创建数量有限的参与者(例如10)(2)让每个参与者创建1个长期运行的私有流程(即没有池)(3)让每个参与者通过ProcessIO与之通信,如果进程忙,则阻塞。问题:仍然没有尽可能简单;演员与阻止流程的互动不畅。

    最佳解决方案选项:(1)没有参与者,从主线程进行简单的for循环将获得与参与者相同的好处(2)创建有限数量的进程(10)(3)通过for循环,依次交互每个进程使用ProcessIO(如果忙-阻止或跳至下一个迭代)

    有什么方法可以设置超时以使该过程返回,如果超时超过重试次数?

    确实有。角色最强大的功能之一是某些角色可以生成其他角色并充当其角色的管理员(接收失败或超时消息,他们可以从中恢复/重新启动)。对于“本地scala演员”,这是通过基本编程来完成的,生成您自己的检查和超时消息。但是,由于Akka方法更强大,更简单,因此我将不作介绍。加上下一个主要的Scala版本(2.11)将使用Akka作为受支持的演员模型,并带有“本地scala演员” deprecated

    这是一个具有编程超时/重启(未编译/测试)的Akka监督演员示例。当然,如果您选择第三个解决方案选项,这将没有用:
    import scala.concurrent.duration._
    import scala.collection.immutable.Set

    class Supervisor extends Actor {
    override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: ArithmeticException => Resume // resumes (reuses) all child actors
    case _: NullPointerException => Restart // restarts all child actors
    case _: IllegalArgumentException => Stop // terminates this actor & all children
    case _: Exception => Escalate // supervisor to receive exception
    }

    val worker = context.actorOf(Props[Worker]) // creates a supervised child actor
    var pendingRequests = Set.empty[WorkerRequest]

    def receive = {
    case req: WorkRequest(sender, jobReq) =>
    pendingRequests = pendingRequests + req
    worker ! req
    system.scheduler.scheduleOnce(10 seconds, self, WorkTimeout(req))
    case resp: WorkResponse(req @ WorkRequest(sender, jobReq), jobResp) =>
    pendingRequests = pendingRequests - req
    sender ! resp
    case timeout: WorkTimeout(req) =>
    if (pendingRequests get req != None) {
    // restart the unresponsive worker
    worker restart
    // resend all pending requests
    pendingRequests foreach{ worker ! _ }
    }
    }
    }

    请注意:演员监督的这种方法无法克服糟糕的建筑和设计。如果您从适合您的要求的适当的过程/线程/角色设计开始,那么监督将提高可靠性。但是,如果您从不良的设计入手,则存在从O / S级别故障中使用“强力”恢复的风险,这可能会加剧您的问题-使过程可靠性变差甚至导致机器崩溃。

    关于scala - Scala系统进程挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21917823/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com