gpt4 book ai didi

scala - 使用一次性 header 处理 Akka 流

转载 作者:行者123 更新时间:2023-12-04 14:29:12 24 4
gpt4 key购买 nike

我有一个接收 TCP 套接字连接的应用程序,它将以以下形式发送数据:

n{json}bbbbbbbbbb...

其中 n 是以下 json 的字节长度,json 可能类似于 {'splitEvery' : 5},这将指示我如何分解和处理后面可能无限长的字节串。

我想在 Scala 中使用 Akka 处理这个流。我认为 streams 是适合此目的的工具,但我很难找到一个使用具有不同处理阶段的流的示例。大多数流似乎一遍又一遍地做同样的事情,比如 prefixAndTail 示例 here .这与我想要处理流的 n{json} 部分的方式非常接近,但不同之处在于我只需要为每个连接执行此操作 一次 然后移动进入不同的处理“阶段”。

谁能给我指出一个使用具有不同阶段的 Akka 流的示例?

最佳答案

这是一个处理 ByteString 流的 GraphStage:

  • 从 header 中提取 block 大小
  • 发出指定 block 大小的 ByteString
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString

class PreProcessor extends GraphStage[FlowShape[ByteString, ByteString]] {

val in: Inlet[ByteString] = Inlet("ParseHeader.in")
val out: Outlet[ByteString] = Outlet("ParseHeader.out")

override val shape = FlowShape.of(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {

var buffer = ByteString.empty
var chunkSize: Option[Int] = None
private var upstreamFinished = false

private val headerPattern = """^\d+\{"splitEvery": (\d+)\}""".r

/**
* @param data The data to parse.
* @return The chunk size and header size if the header
* could be parsed.
*/
def parseHeader(data: ByteString): Option[(Int, Int)] =
headerPattern.
findFirstMatchIn(data.decodeString("UTF-8")).
map { mtch => (mtch.group(1).toInt, mtch.end) }

setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (isClosed(in)) emit()
else pull(in)
}
})

setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
buffer ++= elem
if (chunkSize.isEmpty) {
parseHeader(buffer) foreach { case (chunk, headerSize) =>
chunkSize = Some(chunk)
buffer = buffer.drop(headerSize)
}
}
emit()
}

override def onUpstreamFinish(): Unit = {
upstreamFinished = true
if (chunkSize.isEmpty || buffer.isEmpty) completeStage()
else {
if (isAvailable(out)) emit()
}
}
})

private def continue(): Unit =
if (isClosed(in)) completeStage()
else pull(in)

private def emit(): Unit = {
chunkSize match {
case None => continue()
case Some(size) =>
if (upstreamFinished && buffer.isEmpty ||
!upstreamFinished && buffer.size < size) {
continue()
} else {
val (chunk, nextBuffer) = buffer.splitAt(size)
buffer = nextBuffer
push(out, chunk)
}
}
}
}
}

以及说明用法的测试用例:

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.Source
import akka.util.ByteString
import org.scalatest._

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Random

class PreProcessorSpec extends FlatSpec {

implicit val system = ActorSystem("Test")
implicit val materializer = ActorMaterializer()

val random = new Random

"" should "" in {

def splitRandom(s: String, n: Int): List[String] = s match {
case "" => Nil
case s =>
val (head, tail) = s splitAt random.nextInt(n)
head :: splitRandom(tail, n)
}

val input = """17{"splitEvery": 5}aaaaabbbbbcccccddd"""

val strings = splitRandom(input, 7)
println(strings.map(s => s"[$s]").mkString(" ") + "\n")

val future = Source.fromIterator(() => strings.iterator).
map(ByteString(_)).
via(new PreProcessor()).
map(_.decodeString("UTF-8")).
runForeach(println)

Await.result(future, 5 seconds)
}

}

示例输出:

[17{"] [splitE] [very"] [] [: 5}] [aaaaa] [bbb] [bbcccc] [] [cddd]

aaaaa
bbbbb
ccccc
ddd

关于scala - 使用一次性 header 处理 Akka 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38925326/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com