gpt4 book ai didi

scala - 使用 Akka Actors 的 OutOfMemoryError

转载 作者:行者123 更新时间:2023-12-04 20:37:57 25 4
gpt4 key购买 nike

我有一个使用来自 RabbitMQ 的消息的应用程序,我正在使用 Actors 来处理工作。

这是我的方法:

object QueueConsumer extends Queue {

def consumeMessages = {
setupListener(buildChannel(resultsQueueName), resultsQueueName,
resultsCallback)
}

private def setupListener(receivingChannel: Channel, queue: String,
f: (String) => Any) {
Akka.system.scheduler.scheduleOnce(Duration(10, TimeUnit.SECONDS),
Akka.system.actorOf(Props(new QueueActor(receivingChannel, queue, f))), "")
}

}

class QueueActor(channel:Channel, queue:String, f:(String) => Any) extends Actor{

def receive = {
case _ => startReceiving
}

def startReceiving = {
val consumer = new QueueingConsumer(channel)
channel.basicConsume(queue, false, consumer)
while (true) {
val delivery = consumer.nextDelivery()
val msg = new String(delivery.getBody())
context.actorOf(Props(new Actor {
def receive = {
case some: String => f(some)
}
})) ! msg
channel.basicAck(delivery.getEnvelope.getDeliveryTag, false)
}
}

}

运行几秒钟后,它会抛出 java.lang.OutOfMemoryError:超出 GC 开销限制 .

我认为这是因为我正在为收到的每条消息启动一个新的 Actor - 所以如果我有 100000 条消息,它将创建 100000 个 Actor。这是一个好方法还是我应该实现诸如“ Actor 池”之类的东西?

任何人都知道如何在我的场景中避免 OutOfMemoryError ?

预先感谢。

编辑1:

改变了方法:
class Queue2(json:String) extends Actor {

def receive = {
case x: String =>
val envelope = MessageEnvelopeParser.toObject(x)
val processor = ProcessQueueServiceFactory.getProcessResultsService()
envelope.messages.foreach(message => processor.process(message))
}

}

object Queue2 {
def props(json: String): Props = Props(new Queue2(json))
}

class QueueActor(channel:Channel, queue:String) extends Actor {

def receive = {
case _ => startReceiving
}

def startReceiving = {
val consumer = new QueueingConsumer(channel)
channel.basicConsume(queue, false, consumer)
while (true) {
val delivery = consumer.nextDelivery()
val msg = new String(delivery.getBody())
context.actorOf(Queue2.props(msg))
channel.basicAck(delivery.getEnvelope.getDeliveryTag, false)
}
}
}

最佳答案

您的每条消息参与者在完成后需要停止自己,否则他们将永远留在那里。请参阅 Actor lifecycle 上的文档和 stopping Actors .在这里你只需要添加 context.stop(self)处理完成后。

关于scala - 使用 Akka Actors 的 OutOfMemoryError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31995994/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com