作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
开启:akka-stream-experimental_2.11 1.0。
我们在 Tcp 服务器中使用 Framing.delimiter。当长度大于 maximumFrameLength 的消息到达时,将抛出 FramingException,我们可以从 ActorSubscriber 的 OnError 中捕获它。
服务器代码:
def bind(address: String, port: Int, target: ActorRef, maxInFlight: Int, maxFrameLength: Int)
(implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[ServerBinding] = {
val sink = Sink.foreach {
conn: Tcp.IncomingConnection =>
val targetSubscriber = ActorSubscriber[Message](system.actorOf(Props(new TargetSubscriber(target, maxInFlight))))
val targetSink = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = maxFrameLength, allowTruncation = true))
.map(raw ⇒ Message(raw))
.to(Sink(targetSubscriber))
conn.flow.to(targetSink).runWith(Source(Promise().future))
}
val connections = Tcp().bind(address, port)
connections.to(sink).run()
}
订阅者代码:
class TargetSubscriber(target: ActorRef, maxInFlight: Int) extends ActorSubscriber with ActorLogging {
private var inFlight = 0
override protected def requestStrategy = new MaxInFlightRequestStrategy(maxInFlight) {
override def inFlightInternally = inFlight
}
override def receive = {
case OnNext(msg: Message) ⇒
target ! msg
inFlight += 1
case OnError(t) ⇒
inFlight -= 1
log.error(t, "Subscriber encountered error")
case TargetAck(_) ⇒
inFlight -= 1
}
}
问题:低于最大帧长度的消息不会在此传入连接的异常之后流动。杀死客户端并重新运行它工作正常。
ActorSubscriber 不遵守 supervision
跳过坏消息并继续下一条好消息的正确方法是什么?
最佳答案
您是否尝试过对 targetFlow
接收器而不是整个物化器进行监督?我在这里的任何地方都看不到它,我相信它应该直接在该流上设置。
尽管这更多的是猜测而不是科学 ;)
关于scala-2.11 - 如何从 akka.stream.io.Framing$FramingException 中恢复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31596738/
开启:akka-stream-experimental_2.11 1.0。 我们在 Tcp 服务器中使用 Framing.delimiter。当长度大于 maximumFrameLength 的消息到
我是一名优秀的程序员,十分优秀!