gpt4 book ai didi

scala - Akka-Http Websockets : How to Send consumers the same stream of data

转载 作者:行者123 更新时间:2023-12-04 22:58:13 27 4
gpt4 key购买 nike

我有一个客户端可以连接到的 WebSocket 我还有一个使用 akka-streams 的数据流。我怎样才能让所有客户端都获得相同的数据。目前,他们似乎在争夺数据。

谢谢

最佳答案

您可以做的一种方法是让一个 Actor 扩展 ActorPublisher 并让它订阅
到一些消息。

class MyPublisher extends ActorPublisher[MyData]{

override def preStart = {
context.system.eventStream.subscribe(self, classOf[MyData])
}

override def receive: Receive = {

case msg: MyData ⇒
if (isActive && totalDemand > 0) {
// Pushes the message onto the stream
onNext(msg)
}
}
}

object MyPublisher {
def props(implicit ctx: ExecutionContext): Props = Props(new MyPublisher())
}

case class MyData(data:String)

然后,您可以使用该 actor 作为流的源:
val dataSource = Source.actorPublisher[MyData](MyPublisher.props(someExcutionContext))

然后,您可以从该数据源创建一个流并应用转换将数据转换为 websocket 消息
val myFlow = Flow.fromSinkAndSource(Sink.ignore, dataSource map {d => TextMessage.Strict(d.data)})

然后您可以在路由处理中使用该流程。
path("readings") {
handleWebsocketMessages(myFlow)
}

从原始流的处理中,您可以将数据发布到事件流,并且该参与者的任何实例都将获取它并将其放入为其 websocket 提供服务的流中。
  val actorSystem = ActorSystem("foo")

val otherSource = Source.fromIterator(() => List(MyData("a"), MyData("b")).iterator)

otherSource.runForeach { msg ⇒ actorSystem.eventStream.publish(MyData("data"))}

然后,每个套接字将拥有自己的actor 实例,以向其提供所有来自单个源的数据。

关于scala - Akka-Http Websockets : How to Send consumers the same stream of data,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35276254/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com