- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我过去曾成功使用过 Akka Streams,但是,我目前很难理解为什么 Akka-HTTP 中的客户端 Websocket Streams 被定义并按 documentation 中所示的那样工作。 .
由于 WebSocket 连接允许全双工通信,我希望这样的连接由 Akka HTTP 中的两个单独的流表示,一个用于传入流量,一个用于传出流量。确实,文档说明了以下内容:
A WebSocket consists of two streams of messages [...]
Sink
表示。和由
Source
发出的消息.这是我的第一个困惑点——当使用两个单独的流时,你会期望总共处理两个源和两个接收器,而不是每种类型一个。目前,我的猜测是传入流的源以及传出流的接收器对开发人员来说并没有多大用处,因此只是“隐藏”。
singleWebSocketRequest
时有问题的部分:
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
webSocketClientFlow
时的相同部分:
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(incoming)(Keep.both)
.run()
Source
用于传出消息和 Sink
对于传入的消息?上面的代码看起来像是在向自己发送消息,而不是向服务器发送消息。 Flow[Message, Message, ...]
的语义是什么? ?将传出消息转换为传入消息似乎没有意义。 Source
没有问题和
Sink
并通过 WebSocket 发送数据,我只是想了解为什么阶段的接线是这样完成的。
最佳答案
WebSocket 确实由两个单独的流组成,只是这些流(可能)不在同一个 JVM 上。
您有两个对等点进行通信,一个是服务器,另一个是客户端,但是从已建立的 WebSocket 连接的角度来看,差异不再重要。一个数据流是对等体 1 向对等体 2 发送消息,另一个数据流是对等体 2 向对等体 1 发送消息,然后这两个对等体之间存在网络边界。如果您一次查看一个对等点,则对等点 1 从对等点 2 接收消息,而在第二个流中,对等点 1 正在向对等点 2 发送消息。
每个对等点都有一个接收部分的 Sink 和一个发送部分的 Source。实际上,您确实总共有两个 Sources 和两个 Sinks,只是不在同一个 ActorSystem 上(为了便于解释,假设两个对等点都是在 Akka HTTP 中实现的)。 peer 1 的 Source 连接到 peer 2 的 Sink,peer 2 的 Source 连接到 peer 1 的 Sink。
因此,您编写了一个描述如何处理通过第一个流传入消息的 Sink 和一个描述如何通过第二个流发送消息的 Source。通常,您希望根据接收到的消息生成消息,因此您可以将这两者连接在一起,并通过描述如何响应和传入消息并生成任意数量的传出消息的不同流来路由消息。 Flow[Message, Message, _]
并不意味着您将传出消息转换为传入消息,而是将传入消息转换为传出消息。webSocketFlow
是一个典型的异步边界,一个代表另一个对等点的流。它将传出消息“转换”为传入消息,方法是将它们发送到另一个对等点并发出其他对等点发送的任何内容。
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
[message from other peer]
连接到 printSink
helloSource
连接到 [message to the other peer]
Source.repeat
,你会不断地发送(洪水,真的)“你好,世界!”无论传入消息的速率如何。
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(incoming)(Keep.both)
.run()
outgoing
的所有内容,即您要发送的消息,将其路由到
webSocketFlow
,它通过与其他对等方通信来“转换”消息,并将每个接收到的消息生成为
incoming
.通常,您有一个有线协议(protocol),您可以在其中将案例类/pojo/dto 消息编码和解码为有线格式。
val encode: Flow[T, Message, _] = ???
val decode: Flow[Message, T, _] = ???
val upgradeResponse = outgoing
.via(encode)
.viaMat(webSocketFlow)(Keep.right)
.via(decode)
.to(incoming)
.run()
val chatClientReceivers: Seq[Sink[Message, NotUsed]] = ???
val chatClientSenders: Seq[Source[Message, NotUsed]] = ???
// each of those receivers/senders could be paired in their own websocket compatible flow
val chatSockets: Seq[Flow[Message, Message, NotUsed]] =
(chatClientReceivers, chatClientSenders).zipped.map(
(outgoingSendToClient, incomingFromClient) =>
Flow.fromSinkAndSource(outgoingSendToClient, incomingFromClient))
val toClients: Graph[SinkShape[Message], NotUsed] =
GraphDSL.create() {implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Message](chatClientReceivers.size))
(broadcast.outArray, chatClientReceivers).zipped
.foreach((bcOut, client) => bcOut ~> b.add(client).in)
SinkShape(broadcast.in)
}
val fromClients: Graph[SourceShape[Message], NotUsed] =
GraphDSL.create() {implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Message](chatClientSenders.size))
(merge.inSeq, chatClientSenders).zipped
.foreach((mIn, client) => b.add(client).out ~> mIn)
SourceShape(merge.out)
}
val upgradeResponse: Future[WebSocketUpgradeResponse] =
Source.fromGraph(fromClients)
.viaMat(webSocketFlow)(Keep.right)
.to(toClients)
.run()
关于scala - Akka HTTP 客户端 websocket 流的定义,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45149663/
这个问题在这里已经有了答案: Why filter() after flatMap() is "not completely" lazy in Java streams? (8 个答案) 关闭 6
我正在创建一个应用程序来从 Instagram 收集数据。我正在寻找像 Twitter 流 API 这样的流 API,这样我就可以自动实时收集数据而无需发送请求。 Instagram 有类似的 API
我正在使用 Apache Commons 在 Google App Engine 中上传一个 .docx 文件,如此链接中所述 File upload servlet .上传时,我还想使用 Apach
我尝试使用 DynamoDB 流和 AWS 提供的 Java DynamoDB 流 Kinesis 适配器捕获 DynamoDB 表更改。我正在 Scala 应用程序中使用 AWS Java 开发工具
我目前有一个采用 H.264 编码的 IP 摄像机流式视频 (RTSP)。 我想使用 FFmpeg 将此 H.264 编码流转换为另一个 RTSP 流,但 MPEG-2 编码。我该怎么做?我应该使用哪
Redis 流是否受益于集群模式?假设您有 10 个流,它们是分布在整个集群中还是都分布在同一节点上?我计划使用 Redis 流来实现真正的高吞吐量(200 万条消息/秒),所以我担心这种规模的 Re
这件事困扰了我一段时间。 所以我有一个 Product 类,它有一个 Image 列表(该列表可能为空)。 我想做 product.getImages().stream().filter(...) 但
是否可以使用 具有持久存储的 Redis 流 还是流仅限于内存数据? 我知道可以将 Redis 与核心数据结构的持久存储一起使用,但我已经能够理解是否也可以使用 Redis 中的流的持久存储。 最佳答
我开始学习 Elixir 并遇到了一个我无法轻松解决的挑战。 我正在尝试创建一个函数,该函数接受一个 Enumerable.t 并返回另一个 Enumerable.t ,其中包含下 n 个项目。它与
我试图从 readLine 调用创建一个无限的字符串流: import java.io.{BufferedReader, InputStreamReader} val in = new Buffere
你能帮我使用 Java 8 流 API 编写以下代码吗? SuperUser superUser = db.getSuperUser; for (final Client client : super
我正在尝试服用补品routeguide tutorial,并将客户端变成rocket服务器。我只是接受响应并将gRPC转换为字符串。 service RouteGuide { rpc GetF
流程代码可以是run here. 使用 flow,我有一个函数,它接受一个键值对对象并获取它的值 - 它获取的值应该是字符串、数字或 bool 值。 type ValueType = string
如果我有一个函数返回一个包含数据库信息的对象或一个空对象,如下所示: getThingFromDB: async function(id:string):Promise{ const from
我正在尝试使用javascript api和FB.ui将ogg音频文件发布到流中, 但是我不知道该怎么做。 这是我给FB.ui的电话: FB.ui( { method: '
我正在尝试删除工作区(或克隆它以使其看起来像父工作区,但我似乎两者都做不到)。但是,当我尝试时,我收到此消息:无法删除工作区 test_workspace,因为它有一个非空的默认组。 据我所知,这意味
可以使用 Stream|Map 来完成此操作,这样我就不需要将结果放入外部 HashMap 中,而是使用 .collect(Collectors.toMap(...)); 收集结果? Map rep
当我们从集合列表中获取 Stream 时,幕后到底发生了什么?我发现很多博客都说Stream不存储任何数据。如果这是真的,请考虑代码片段: List list = new ArrayList(); l
我对流及其工作方式不熟悉,我正在尝试获取列表中添加的特定对象的出现次数。 我找到了一种使用Collections来做到这一点的方法。其过程如下: for (int i = 0; i p.conten
我希望将一个 map 列表转换为另一个分组的 map 列表。 所以我有以下 map 列表 - List [{ "accId":"1", "accName":"TestAcc1", "accNumber
我是一名优秀的程序员,十分优秀!