- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
目标是发送WSConnectEvent
一旦客户端连接并且流开始。使用 akka-streams 1.0,我能够通过以下方式完成此操作:
Flow(Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)) {
implicit builder =>
sdpSource =>
// Incoming SDP offer flow
val fromWebsocket = builder.add(Flow[Message].collect {
case TextMessage.Strict(txt) => {
val event = txt.parseJson.convertTo[WSResponseEvent]
WSMessageEvent(callUUID, userUUID, event.id, event.data)
}
})
// Outgoing SDP answer flow
val toWebsocket = builder.add(Flow[WSResponseEvent].map {
case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
})
val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
val callActorRef = Await.result(callActorSelection.resolveOne(), Duration.Inf);
val callActorSink = Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID))
// Join events, also sends actor for sending stuff
val merge = builder.add(Merge[CallControlEvent](2))
val actorAsSource = builder.materializedValue.map(actor => WSConnectEvent(callUUID, userUUID, actor))
fromWebsocket ~> merge.in(0)
actorAsSource ~> merge.in(1)
merge ~> callActorSink
sdpSource ~> toWebsocket
(fromWebsocket.inlet, toWebsocket.outlet)
}
WSConnectEvent
信息。我不确定这是因为我的源设置不正确,还是我没有实现
ActorRef
适本地。
val sdpSource = Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)
Flow.fromGraph(
GraphDSL.create() { implicit builder =>
// Incoming SDP offer flow
val fromWebsocket = builder.add(Flow[Message].collect {
case TextMessage.Strict(txt) => {
val event = txt.parseJson.convertTo[WSResponseEvent]
WSMessageEvent(callUUID, userUUID, event.id, event.data)
}
})
// Outgoing SDP answer flow
val toWebsocket = builder.add(Flow[WSResponseEvent].map {
case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
})
val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
val callActorRef = Await.result(callActorSelection.resolveOne(), Duration.Inf);
val callActorSink = Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID))
// Join events, also sends actor for sending stuff
val merge = builder.add(Merge[CallControlEvent](2))
val actorAsSource = sdpSource.mapMaterializedValue(WSConnectEvent(callUUID, userUUID, _))
fromWebsocket ~> merge.in(0)
actorAsSource ~> merge.in(1)
merge ~> callActorSink
sdpSource ~> toWebsocket
FlowShape(fromWebsocket.in, toWebsocket.out)
}
)
最佳答案
感谢johanandren的帮助,mapMaterializedValue
不是正确的方法,相反,我需要构建一个流来发送 WSConnectEvent
并连接 builder.materializeValue
的输出通过它直到端口中的“合并”,如下所示:
// Join events, also sends actor for sending stuff
val actorConnected = Flow[ActorRef].map(WSConnectEvent(callUUID, userUUID, _))
builder.materializedValue ~> actorConnected ~> merge.in(1)
val sdpSource: Source[WSResponseEvent, ActorRef] = Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)
Flow.fromGraph(GraphDSL.create(sdpSource) {
implicit builder =>
{ (responseSource) =>
import GraphDSL.Implicits._
// Incoming SDP offer flow
val fromWebsocket = builder.add(Flow[Message].mapAsync(1)(_ match {
case tm: TextMessage => tm.textStream.runFold("")(_ + _).map(Some(_))
case bm: BinaryMessage =>
bm.dataStream.runWith(Sink.ignore)
Future.successful(None)
}).collect {
case Some(txt) => {
val event = txt.parseJson.convertTo[WSResponseEvent]
WSMessageEvent(callUUID, userUUID, event.id, event.data)
}
})
// Outgoing SDP answer flow
val toWebsocket = builder.add(Flow[WSResponseEvent].map {
case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
})
val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
val callActorRef = Await.result(callActorSelection.resolveOne(), 2.minutes);
val toCallActor = builder.add(Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID)))
// Join events, also sends actor for sending stuff
val merge = builder.add(Merge[CallControlEvent](2))
val actorConnected = Flow[ActorRef].map(WSConnectEvent(callUUID, userUUID, _))
fromWebsocket ~> merge.in(0)
builder.materializedValue ~> actorConnected ~> merge.in(1)
merge ~> toCallActor
responseSource ~> toWebsocket
FlowShape.of(fromWebsocket.in, toWebsocket.out)
}
})
关于scala - 如何在 Akka-Stream 2.0 流程开始时向 ActorRef 发送消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34558522/
我正在尝试使用 Source.actorRef创建 akka.stream.scaladsl.Source 的方法目的。某种形式 import akka.stream.OverflowStrategy
我需要将 ActorRef 保留一段确定的时间。与此同时,我一直在“观察”这个 ActorRef。 由于某种原因,我的 ActorRef 是否可能变得无效,而我却没有收到“死信”? 抱歉,如果这个问题
我开始学习Akka并从official guid下载了示例: 我不明白tell方法第二个参数的用法: 在main方法中写入: howdyGreeter.tell(new WhoToGreet("Akk
我正在学习 Akka,并试图弄清楚如何让 Actor 互相交谈(我们称他们为 A 和 B)。这不是请求/响应场景,A和 B随时互相发送消息。 目前我有两个兄弟 Actor ,他们在两个方向上互相传递消
我们通过扩展 EventSourcedBehavior 将事件源与 Akka Persistence 结合使用。当我们创建持久性 Actor 时,我们通过使用 uuid 给它一个唯一的名称(与我们在
我有以下内容: val future = myActor ?消息 在我的 Actor 中,我收到的消息是这样的: 发件人!响应 如果我做了以下并忽略了响应,是否有任何负面影响? 我的 Actor !消
我尝试从 ActorSelection 获取多个 ActorRef。有人知道是否可能吗? 我的代码 ActorRef actorRef = Await.result(getContext().acto
我不明白关于在回调中关闭 actor ref 的声明。目前我正在使用 public void onReceive(Object message) throws Exception {
根据此链接的程序: Java Akka's ActorRef async issues 是否有任何 Akka native 函数可以用作 actor.tell 的调度函数?我想安排每个时间间隔来自动“
我已经开始使用 Akka 与并发程序进行异步: import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor
我开始使用 Spark Streaming 来处理我收到的实时数据馈送。我的场景是我有一个使用“with ActorHelper”的 Akka Actor 接收器,然后我让我的 Spark 工作做一些
我有两个 Actor 。每个 Actor 都位于不同的 ActorSystem 中。首先缓存第二个ActorRef。第一个 Actor 这样做: actorRef.tell(msg, self())
我想弄清楚我是否使用了传递 Akka ActorRef周围的其他 Actor 不是一种反模式。 我的系统中有几个 Actor 。有些人很长寿( restClientRouter , publisher
我尝试为一个类进行 Spock 测试,我需要检查它是否向 actor 发送消息(例如 statActor)。我知道 Akka 有专门的集成测试库,但对于非常简单的测试来说似乎太多了。所以,我尝试过:
我正在尝试使用 Source.actorRef 向 actor binset 发送消息,但这部分代码: println(s"Before mapping $src") src.mapMateriali
我有一个场景,当 Actor 收到特定消息时,它必须将该消息转发给当时存在的所有子级。 我想我有两个选择来解决这个问题。 getContext().actorSelection("*").forwar
好的,所以我正在使用 SJSON 为 scala 中的案例类编写隐式转换,以使用 akka 框架向远程参与者发送消息。其中一个案例类如下所示 case class Example(id: String
我想用预定义的消息数来完成一个流: Source.actorRef(Integer.MAX_VALUE, OverflowStrategy.fail()) .limit(1) .to(
根据Play documentation on WebSockets建立 WebSocket 的标准方法是使用 ActorFlow.actorRef,它采用返回我的 Actor 的 Props 的函数
我正在学习 Akka,在探索 API 时,我遇到了一些奇怪的东西(至少对我来说)。 tell函数直接在 ActorRef 类上定义。但是,ask函数在 AskSupport 特征中声明。我想不出他们为
我是一名优秀的程序员,十分优秀!