gpt4 book ai didi

akka-stream - 如何在断开连接后清理 akka-http websocket 资源然后重试?

转载 作者:行者123 更新时间:2023-12-04 03:23:51 25 4
gpt4 key购买 nike

下面的代码成功建立了一个 websocket 连接。

websockets 服务器(也是 akk-http)故意使用 Andrew's suggested answer here 关闭连接.
SinkActor下面收到一条 akka.actor.Status.Failure 类型的消息所以我知道从服务器到客户端的消息流已经中断。

我的问题是......我的客户端应该如何重新建立 websocket 连接?有 source.via(webSocketFlow).to(sink).run()完全的?

清理资源和重试 websocket 连接的最佳实践是什么?

class ConnectionAdminActor extends Actor with ActorLogging {
implicit val system: ActorSystem = context.system
implicit val flowMaterializer = ActorMaterializer()

private val sinkActor = context.system.actorOf(Props[SinkActor], name = "SinkActor")

private val sink = Sink.actorRefWithAck[Message](sinkActor, StartupWithActor(self.path), Ack, Complete)

private val source = Source.actorRef[TextMessage](10, OverflowStrategy.dropHead).mapMaterializedValue {
ref => {
self ! StartupWithActor(ref.path)
ref
}
}

private val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080"))

source
.via(webSocketFlow)
.to(sink)
.run()

最佳答案

试试 recoverWithRetries组合器(文档 here)。

这允许您提供替代方案 Source您的管道将切换到,以防上游出现故障。在最简单的情况下,您可以重复使用相同的 Source ,它应该发出一个新的连接。

val wsSource = source via webSocketFlow

wsSource
.recoverWithRetries(attempts = -1, {case e: Throwable => wsSource})
.to(sink)

注意
  • attempts = -1将无限期地重试重新连接
  • 部分函数允许更精细地控制哪些异常可以触发重新连接
  • 关于akka-stream - 如何在断开连接后清理 akka-http websocket 资源然后重试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41850528/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com