gpt4 book ai didi

actor - Akka::在 ActorSystem 中使用对事件流具有不同优先级的消息

转载 作者:行者123 更新时间:2023-12-04 17:24:31 24 4
gpt4 key购买 nike

我需要将不同类型的消息发布到事件流,而那些
消息应该有不同的优先级,例如,如果 10 个类型的消息
A 已经发布了,毕竟发布了一条类型 B 的消息,并且
B 的优先级高于 A 的优先级 - 消息 B 应该被拾取
即使队列中有 10 条类型为 A 的消息,也由下一个参与者执行。

我已阅读有关优先消息的信息 here并创建了我对该邮箱的简单实现:

  class PrioritizedMailbox(settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(

PriorityGenerator {
case ServerPermanentlyDead => println("Priority:0"); 0
case ServerDead => println("Priority:1"); 1
case _ => println("Default priority"); 10
}

)

然后我在 application.conf 中配置它
akka {

actor {

prio-dispatcher {
type = "Dispatcher"
mailbox-type = "mailbox.PrioritizedMailbox"
}

}

}

并连接到我的 Actor :
private val myActor = actors.actorOf(
Props[MyEventHandler[T]].
withRouter(RoundRobinRouter(HIVE)).
withDispatcher("akka.actor.prio-dispatcher").
withCreator(
new Creator[Actor] {
def create() = new MyEventHandler(storage)
}), name = "eventHandler")

我正在使用 ActorSystem.eventStream.publish 来发送消息,而我的 Actor
订阅了它(我可以在日志中看到消息被处理,但在
FIFO 顺序)。

但是看起来还不够,因为在日志/控制台中我从未见过
诸如“默认优先级”之类的消息。我在这里错过了什么吗?是否
描述的方法适用于事件流或直接调用
向 Actor 发送消息?我如何获得优先消息
事件流?

最佳答案

你的问题是你的 Actor 非常快,所以消息在他们有时间排队之前就被处理了,所以邮箱不能做任何优先级。下面的例子证明了这一点:

  trait Foo 
case object X extends Foo
case object Y extends Foo
case object Z extends Foo

class PrioritizedMailbox(settings: ActorSystem.Settings, cfg: Config)
extends UnboundedPriorityMailbox(
PriorityGenerator {
case X ⇒ 0
case Y ⇒ 1
case Z ⇒ 2
case _ ⇒ 10
})

val s = ActorSystem("prio", com.typesafe.config.ConfigFactory.parseString(
""" prio-dispatcher {
type = "Dispatcher"
mailbox-type = "%s"
}""".format(classOf[PrioritizedMailbox].getName)))
val latch = new java.util.concurrent.CountDownLatch(1)
val a = s.actorOf(Props(new akka.actor.Actor {
latch.await // Just wait here so that the messages are queued up
inside the mailbox
def receive = {
case any ⇒ /*println("Processing: " + any);*/ sender ! any
}
}).withDispatcher("prio-dispatcher"))
implicit val sender = testActor
a ! "pig"
a ! Y
a ! Z
a ! Y
a ! X
a ! Z
a ! X
a ! "dog"

latch.countDown()

Seq(X, X, Y, Y, Z, Z, "pig", "dog") foreach { x => expectMsg(x) }
s.shutdown()

该测试以优异的成绩通过

关于actor - Akka::在 ActorSystem 中使用对事件流具有不同优先级的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12204909/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com