gpt4 book ai didi

multithreading - 当 8-10 个 Actor 同时运行时,一些 Scala Actor 进入等待状态

转载 作者:行者123 更新时间:2023-12-02 06:23:32 26 4
gpt4 key购买 nike

在我的模型中,大约有 8-9 个 Scala Actor。每个参与者在 RabbitMQ 服务器上都有自己的队列

在每个 Actor 的 act 方法中。它不断地列在队列中喜欢

def act {
this ! 1
loop {
react {
case 1 => processMessage(QManager.getMessage); this ! 1
}
}
}

我是rabbitMq QManager的getMessage方法

def getMessage: MyObject = {
getConnection
val durable = true
channel.exchangeDeclare(EXCHANGE, "direct", durable)
channel.queueDeclare(QUEUE, durable, false, false, null)
channel queueBind (QUEUE, EXCHANGE, _ROUTING_KEY)
consumer = new QueueingConsumer(channel)
channel basicConsume (QUEUE, false, consumer)

var obj = new MyObject
try {
val delivery = consumer.nextDelivery
val msg = new java.io.ObjectInputStream(
new java.io.ByteArrayInputStream(delivery.getBody)).readObject()
obj = msg.asInstanceOf[MyObject]
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)
} catch {
case e: Exception =>logger.error("error in Get Message", e);endConnection
}
endConnection
obj
}

所有 9 个 Actors 都有自己的对象类型和自己的 QManager

在 GetMessage 中我使用 Rabbitmq QueueConsumer

 val delivery = consumer.nextDelivery

nextDelivery 方法在队列中找到时返回一个对象此方法使 actor 处于等待状态

当我启动所有 8 个 Actor 时,只有 4 个工作正常,其他没有说明。我已经测试了每个独立运行的 Actor ,他们在单独启动时工作正常

当我启动超过 4 个 Actor 时会出现问题

Scala actors 的线程化有什么问题吗?

最佳答案

免责声明:我是Akka的PO

正如 Rex 所说,您正忙于等待,占用共享线程池中的线程。

我不知道您是否可以选择测试 Akka,但我们支持 AMQP 消费者(和生产者)作为参与者:Akka-AMQP

生成 AMQP 消息:

    val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters), producerId = Some("my_producer"))
producer ! Message("Some simple sting data".getBytes, "some.routing.key")

消费 AMQP 消息:

val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
val myConsumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing.key", actorOf(new Actor { def receive = {
case Delivery(payload, _, _, _, _, _) => log.info("Received delivery: %s", new String(payload))
}}), None, Some(exchangeParameters)))

另一种选择是使用 Akka-Camel与参与者一起消费和生产 AMQP 消息

关于multithreading - 当 8-10 个 Actor 同时运行时,一些 Scala Actor 进入等待状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4815938/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com