gpt4 book ai didi

scala - Akka HTTP 客户端 websocket 流的定义

转载 作者:行者123 更新时间:2023-12-04 01:43:42 37 4
gpt4 key购买 nike

我过去曾成功使用过 Akka Streams,但是,我目前很难理解为什么 Akka-HTTP 中的客户端 Websocket Streams 被定义并按 documentation 中所示的那样工作。 .

由于 WebSocket 连接允许全双工通信,我希望这样的连接由 Akka HTTP 中的两个单独的流表示,一个用于传入流量,一个用于传出流量。确实,文档说明了以下内容:

A WebSocket consists of two streams of messages [...]



它进一步指出传入的消息由 Sink 表示。和由 Source 发出的消息.这是我的第一个困惑点——当使用两个单独的流时,你会期望总共处理两个源和两个接收器,而不是每种类型一个。目前,我的猜测是传入流的源以及传出流的接收器对开发人员来说并没有多大用处,因此只是“隐藏”。

但是,将所有内容连接在一起时确实会让人感到困惑(请参阅上面链接的文档)。

使用 singleWebSocketRequest 时有问题的部分:
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)

或使用 webSocketClientFlow 时的相同部分:
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(incoming)(Keep.both)
.run()

这与我目前对流工作流程的理解相矛盾。
  • 为什么我要结合 Source用于传出消息和 Sink对于传入的消息?上面的代码看起来像是在向自己发送消息,而不是向服务器发送消息。
  • 此外,Flow[Message, Message, ...] 的语义是什么? ?将传出消息转换为传入消息似乎没有意义。
  • 不应该有两个流而不是一个流吗?

  • 感谢您对提高我的理解的任何帮助,谢谢。

    编辑:

    我使用 Source 没有问题和 Sink并通过 WebSocket 发送数据,我只是想了解为什么阶段的接线是这样完成的。

    最佳答案

    WebSocket 确实由两个单独的流组成,只是这些流(可能)不在同一个 JVM 上。

    您有两个对等点进行通信,一个是服务器,另一个是客户端,但是从已建立的 WebSocket 连接的角度来看,差异不再重要。一个数据流是对等体 1 向对等体 2 发送消息,另一个数据流是对等体 2 向对等体 1 发送消息,然后这两个对等体之间存在网络边界。如果您一次查看一个对等点,则对等点 1 从对等点 2 接收消息,而在第二个流中,对等点 1 正在向对等点 2 发送消息。

    每个对等点都有一个接收部分的 Sink 和一个发送部分的 Source。实际上,您确实总共有两个 Sources 和两个 Sinks,只是不在同一个 ActorSystem 上(为了便于解释,假设两个对等点都是在 Akka HTTP 中实现的)。 peer 1 的 Source 连接到 peer 2 的 Sink,peer 2 的 Source 连接到 peer 1 的 Sink。

    因此,您编写了一个描述如何处理通过第一个流传入消息的 Sink 和一个描述如何通过第二个流发送消息的 Source。通常,您希望根据接收到的消息生成消息,因此您可以将这两者连接在一起,并通过描述如何响应和传入消息并生成任意数量的传出消息的不同流来路由消息。 Flow[Message, Message, _]并不意味着您将传出消息转换为传入消息,而是将传入消息转换为传出消息。
    webSocketFlow是一个典型的异步边界,一个代表另一个对等点的流。它将传出消息“转换”为传入消息,方法是将它们发送到另一个对等点并发出其他对等点发送的任何内容。

    val flow: Flow[Message, Message, Future[Done]] =
    Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)

    此流程是您对等方的两个流程中的一半:
  • [message from other peer]连接到 printSink
  • helloSource连接到 [message to the other peer]

  • 传入消息和传出消息之间没有关系,您只需打印收到的所有内容并发送一个“hello world!”。实际上,由于源在一条消息后完成,WebSocket 连接也会关闭,但是如果将 Source 替换为例如 Source.repeat ,你会不断地发送(洪水,真的)“你好,世界!”无论传入消息的速率如何。
    val (upgradeResponse, closed) =
    outgoing
    .viaMat(webSocketFlow)(Keep.right)
    .toMat(incoming)(Keep.both)
    .run()

    在这里,您可以获取来自 outgoing 的所有内容,即您要发送的消息,将其路由到 webSocketFlow ,它通过与其他对等方通信来“转换”消息,并将每个接收到的消息生成为 incoming .通常,您有一个有线协议(protocol),您可以在其中将案例类/pojo/dto 消息编码和解码为有线格式。
    val encode: Flow[T, Message, _] = ???
    val decode: Flow[Message, T, _] = ???

    val upgradeResponse = outgoing
    .via(encode)
    .viaMat(webSocketFlow)(Keep.right)
    .via(decode)
    .to(incoming)
    .run()

    或者您可以想象某种聊天服务器(啊,websockets 和聊天),它广播和合并来自多个客户端的消息和向多个客户端发送消息。这应该从任何客户端获取任何消息并将其发送到每个客户端(仅用于演示,未经测试,可能不是您想要的实际聊天服务器):
    val chatClientReceivers: Seq[Sink[Message, NotUsed]] = ???
    val chatClientSenders: Seq[Source[Message, NotUsed]] = ???

    // each of those receivers/senders could be paired in their own websocket compatible flow
    val chatSockets: Seq[Flow[Message, Message, NotUsed]] =
    (chatClientReceivers, chatClientSenders).zipped.map(
    (outgoingSendToClient, incomingFromClient) =>
    Flow.fromSinkAndSource(outgoingSendToClient, incomingFromClient))

    val toClients: Graph[SinkShape[Message], NotUsed] =
    GraphDSL.create() {implicit b =>
    import GraphDSL.Implicits._

    val broadcast = b.add(Broadcast[Message](chatClientReceivers.size))

    (broadcast.outArray, chatClientReceivers).zipped
    .foreach((bcOut, client) => bcOut ~> b.add(client).in)

    SinkShape(broadcast.in)
    }

    val fromClients: Graph[SourceShape[Message], NotUsed] =
    GraphDSL.create() {implicit b =>
    import GraphDSL.Implicits._

    val merge = b.add(Merge[Message](chatClientSenders.size))

    (merge.inSeq, chatClientSenders).zipped
    .foreach((mIn, client) => b.add(client).out ~> mIn)

    SourceShape(merge.out)
    }

    val upgradeResponse: Future[WebSocketUpgradeResponse] =
    Source.fromGraph(fromClients)
    .viaMat(webSocketFlow)(Keep.right)
    .to(toClients)
    .run()

    希望这个对你有帮助。

    关于scala - Akka HTTP 客户端 websocket 流的定义,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45149663/

    37 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com