gpt4 book ai didi

scala - Akka Actor pubsub : subscribe using multiple parameters in one EventStream?

转载 作者:行者123 更新时间:2023-12-04 18:12:05 27 4
gpt4 key购买 nike

作为引用,这个问题源于 Scala method performance (collect or foreach or other) looping through sockets?

我在一个actor中存储一个对websocket的引用,然后将该actor订阅到一个Akka EventStream:

val socketActor = system.actorOf(Props(new Actor {
val socket = WebSocketConnection

def receive = {
case d: AppMessage ⇒ socket.send(d)
}
}))
system.eventStream.subscribe(socketActor, classOf[AppMessage])

让我烦恼的是,我可以用 EventStream 制作的唯一分类器是类类型。因此,如果您想将消息路由到不同的参与者,比如基于 userId,您是否需要创建多个 EventStream 并手动构建 EventBus,或者这里有什么我遗漏的东西?

如果我能做一些简单的事情就好了:
system.eventStream.subscribe(socketActor, Map("userId" -> userId, "teamId" -> teamId) )

这可能只是一个概念问题,因为我不太确定 EventStream 代表什么。

最佳答案

这是我基于 ActorEventBus 的解决方案基于此要点:https://gist.github.com/3757237

我发现这比处理 EventStreams 更易于维护。也许将来需要多个 EventStream,但此时它很容易支持当前的基础架构。

消息总线

首先,MessageBus 根据 PubSub channel 处理到封装在 actor 中的套接字的传出消息:

case class MessageEvent(val channel:String, val message:String)

/**
* message bus to route messages to their appropriate contexts
*/
class MessageBus extends ActorEventBus with LookupClassification {

type Event = MessageEvent
type Classifier = String

protected def mapSize(): Int = {
10
}

protected def classify(event: Event): Classifier = {
event.channel
}

protected def publish(event: Event, subscriber: Subscriber): Unit = {
subscriber ! event
}

}


object MessageBus {

val actorSystem = ActorSystem("contexts")
val Bus = new MessageBus

/**
* create an actor that stores a browser socket
*/
def browserSocketContext(s: WebSocketConnection, userId: Long, teamId: Long) = {
val subscriber = actorSystem.actorOf(Props(new BrowserSocket(s,userId,teamId)))

Bus.subscribe( subscriber, "/app/socket/%s" format s.toString)
Bus.subscribe( subscriber, "/app/browser/u/%s" format userId )
Bus.subscribe( subscriber, "/app/browser/t/%s" format teamId )
Bus.subscribe( subscriber, "/app/browser" )
}
}

带有 Actor 的套接字访问

这是实际包含套接字的 Actor :
/**
* actor wrapping access for browser socket
*/
class BrowserSocket(
val s: WebSocketConnection,
val userId: Long,
val teamId: Long

) extends Actor {

def receive = {
case payload:MessageEvent =>
s.send(payload.message)

case ping:MessagePing =>
s.ping(ping.data)

}

}

关于scala - Akka Actor pubsub : subscribe using multiple parameters in one EventStream?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12415740/

27 4 0