gpt4 book ai didi

scala - 使用 Akka 的简单服务器推送广播流程

转载 作者:行者123 更新时间:2023-12-01 01:42:50 25 4
gpt4 key购买 nike

我正在努力实现一个 - 相当简单 - Akka 流程。
这是我认为我需要的:

akka-flow

我有一个服务器和 n 个客户端,希望能够通过向客户端广播消息 (JSON) 来对外部事件使用react。客户可以随时注册/注销。

例如:

  • 1 位客户已注册
  • 服务器抛出一个事件(“Hello World!”)
  • 服务器广播“Hello World!”给所有客户(一位客户)
  • 一个新客户端打开一个 websocket 连接
  • 服务器抛出另一个事件(“Hello Akka!”)
  • 服务器广播“Hello Akka!”给所有客户(两个客户)

  • 这是我到目前为止所拥有的:

    def route: Route = {
    val register = path("register") {
    // registration point for the clients
    handleWebSocketMessages(serverPushFlow)
    }
    }

    // ...

    def broadcast(msg: String): Unit = {
    // use the previously created flow to send messages to all clients
    // ???
    }

    // my broadcast sink to send messages to the clients
    val broadcastSink: Sink[String, Source[String, NotUsed]] = BroadcastHub.sink[String]

    // a source that emmits simple strings
    val simpleMsgSource = Source(Nil: List[String])

    def serverPushFlow = {
    Flow[Message].mapAsync(1) {
    case TextMessage.Strict(text) => Future.successful(text)
    case streamed: TextMessage.Streamed => streamed.textStream.runFold("")(_ ++ _)
    }
    .via(Flow.fromSinkAndSource(broadcastSink, simpleMsgSource))
    .map[Message](string => TextMessage(string))
    }

    最佳答案

    为了能够使用broadcastHub,您必须定义两个流。一个运行你的 websocket TextMessagebroadcastHub .您必须运行它,它会生成一个连接到每个客户端的源。

    这是在简单的可运行应用程序中描述的这个概念。

    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.{BroadcastHub, Sink, Source}
    import org.slf4j.LoggerFactory

    import scala.concurrent.duration._

    object BroadcastSink extends App {

    private val logger = LoggerFactory.getLogger("logger")

    implicit val actorSystem = ActorSystem()
    implicit val actorMaterializer = ActorMaterializer()

    val broadcastSink: Sink[String, Source[String, NotUsed]] =
    BroadcastHub.sink[String]

    val simpleMsgSource = Source.tick(500.milli, 500.milli, "Single Message")

    val sourceForClients: Source[String, NotUsed] = simpleMsgSource.runWith(broadcastSink)

    sourceForClients.to(Sink.foreach(t => logger.info(s"Client 1: $t"))).run()
    Thread.sleep(1000)

    sourceForClients.to(Sink.foreach(t => logger.info(s"Client 2: $t"))).run()
    Thread.sleep(1000)

    sourceForClients.to(Sink.foreach(t => logger.info(s"Client 3: $t"))).run()
    Thread.sleep(1000)

    sourceForClients.to(Sink.foreach(t => logger.info(s"Client 4: $t"))).run()
    Thread.sleep(1000)

    actorSystem.terminate()
    }

    打印
    10:52:01.774 Client 1: Single Message
    10:52:02.273 Client 1: Single Message
    10:52:02.273 Client 2: Single Message
    10:52:02.773 Client 2: Single Message
    10:52:02.773 Client 1: Single Message
    10:52:03.272 Client 3: Single Message
    10:52:03.272 Client 2: Single Message
    10:52:03.272 Client 1: Single Message
    10:52:03.772 Client 1: Single Message
    10:52:03.772 Client 3: Single Message
    10:52:03.773 Client 2: Single Message
    10:52:04.272 Client 2: Single Message
    10:52:04.272 Client 4: Single Message
    10:52:04.272 Client 1: Single Message
    10:52:04.273 Client 3: Single Message
    10:52:04.772 Client 1: Single Message
    10:52:04.772 Client 2: Single Message
    10:52:04.772 Client 3: Single Message
    10:52:04.772 Client 4: Single Message
    10:52:05.271 Client 4: Single Message
    10:52:05.271 Client 1: Single Message
    10:52:05.271 Client 3: Single Message
    10:52:05.272 Client 2: Single Message

    如果事先知道客户,则不需要 BrodacastHub,可以使用 alsoTo方法:
      def webSocketHandler(clients: List[Sink[Message, NotUsed]]): Flow[Message, Message, Any] = {
    val flow = Flow[Message]
    clients.foldLeft(flow) {case (fl, client) =>
    fl.alsoTo(client)
    }
    }

    关于scala - 使用 Akka 的简单服务器推送广播流程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54687116/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com