gpt4 book ai didi

scala - Akka websocket - 如何通过服务器关闭连接?

转载 作者:行者123 更新时间:2023-12-04 23:16:16 26 4
gpt4 key购买 nike

所以这是我的 websocket 服务器实现。

val route = get {
pathEndOrSingleSlash {
handleWebSocketMessages(websocketFlow)
}
}

def websocketFlow: Flow[Message, Message, Any] =
Flow[Message]
.collect { case TextMessage.Strict(textMessage) => protocol.hydrate(textMessage) }
.via(chatActorFlow(UUID.randomUUID()))
.map(event => TextMessage.Strict(protocol.serialize(event)))


def chatActorFlow(connectionId: UUID) : Flow[Protocol.Message, Protocol.Event, Any] = {

val sink = Flow[Protocol.Message]
.map(msg => Protocol.SignedMessage(connectionId, msg))
.to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))

val source = Source
.mapMaterializedValue {
actor : ActorRef => {
chatRef ! Protocol.OpenConnection(actor, connectionId)
}
}

Flow.fromSinkAndSource(sink, source)
}

我想知道是否有任何方法可以关闭一次 ConnectionClosed 类型的消息的连接由 chatRef 发送?

最佳答案

下面的解决方案允许通过终止由 Source.actorRef 实现的 Actor 来断开来自服务器端的连接。阶段。这只需发送一个 PoisonPill 即可完成。到它。

现在,我仍然不清楚您希望如何在连接时识别“被禁止的”客户端,所以这个例子 - 故意 - 非常简单:服务器在最大数量的客户端连接后断开任何连接。如果您想随时使用任何其他策略踢出客户端,您仍然可以应用相同的逻辑并发送PoisonPill到他们自己的源 Actor 。

object ChatApp extends App {

implicit val system = ActorSystem("chat")
implicit val executor: ExecutionContextExecutor = system.dispatcher
implicit val materializer = ActorMaterializer()

val route = get {
pathEndOrSingleSlash {
handleWebSocketMessages(websocketFlow)
}
}

val maximumClients = 1

class ChatRef extends Actor {
override def receive: Receive = withClients(Map.empty[UUID, ActorRef])

def withClients(clients: Map[UUID, ActorRef]): Receive = {
case SignedMessage(uuid, msg) => clients.collect{
case (id, ar) if id == uuid => ar ! msg
}
case OpenConnection(ar, uuid) if clients.size == maximumClients => ar ! PoisonPill
case OpenConnection(ar, uuid) => context.become(withClients(clients.updated(uuid, ar)))
case CloseConnection(uuid) => context.become(withClients(clients - uuid))
}
}

object Protocol {
case class SignedMessage(uuid: UUID, msg: String)
case class OpenConnection(actor: ActorRef, uuid: UUID)
case class CloseConnection(uuid: UUID)
}

val chatRef = system.actorOf(Props[ChatRef])

def websocketFlow: Flow[Message, Message, Any] =
Flow[Message]
.mapAsync(1) {
case TextMessage.Strict(s) => Future.successful(s)
case TextMessage.Streamed(s) => s.runFold("")(_ + _)
case b: BinaryMessage => throw new Exception("Binary message cannot be handled")
}.via(chatActorFlow(UUID.randomUUID()))
.map(TextMessage(_))

def chatActorFlow(connectionId: UUID) : Flow[String, String, Any] = {

val sink = Flow[String]
.map(msg => Protocol.SignedMessage(connectionId, msg))
.to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))

val source = Source.actorRef(16, OverflowStrategy.fail)
.mapMaterializedValue {
actor : ActorRef => {
chatRef ! Protocol.OpenConnection(actor, connectionId)
}
}

Flow.fromSinkAndSource(sink, source)
}

Http().bindAndHandle(route, "0.0.0.0", 8080)
.map(_ => println(s"Started server..."))

}

关于scala - Akka websocket - 如何通过服务器关闭连接?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41316173/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com