gpt4 book ai didi

json - 使用 AKKA Stream 解码分块 JSON

转载 作者:行者123 更新时间:2023-12-01 21:40:02 26 4
gpt4 key购买 nike

我有一个来自输入文件的 Source[ByteString, _] ,其中包含 3 行,如下所示(实际上输入是具有连续流的 TCP 套接字):

{"a":[2
33]
}

现在的问题是我想将其解析为 Source[ChangeMes​​sage,_],但是我找到的唯一示例处理的是每行都有完整的 JSON 消息的情况,而不是每个 JSON 消息可以分为多行。

我发现的一个例子是 this库,但它需要 }, 作为最后一个字符,即每行一个 JSON。下面的示例显示了此设置。

"My decoder" should "decode chunked json" in {
implicit val sys = ActorSystem("test")
implicit val mat = ActorMaterializer()
val file = Paths.get("chunked_json_stream.json")
val data = FileIO.fromPath(file)
.via(CirceStreamSupport.decode[ChangeMessage])
.runWith(TestSink.probe[ChangeMessage])
.request(1)
.expectComplete()
}

另一种替代方法是使用折叠和平衡 } 并且仅在整个 JSON 完成时才发出。这样做的问题是折叠运算符仅在流完成时发出,并且由于这是一个连续流,我不能在这里使用它。

My question is: What is the fastest way to parse chunked JSON streams in AKKA Stream and are there any available software that already does this? If possible I would like to use circe

最佳答案

作为 knutwalker/akka-stream-json 的文档说:

This flow even supports parsing multiple json documents in whatever fragmentation they may arrive, which is great for consuming stream/sse based APIs.

在您的情况下,您所需要做的就是分隔传入的字节字符串:

"My decoder" should "decode chunked json" in {
implicit val sys = ActorSystem("test")
implicit val mat = ActorMaterializer()
val file = Paths.get("chunked_json_stream.json")

val sourceUnderTest =
FileIO.fromPath(file)
.via(Framing.delimiter(ByteString("\n"), 8192, allowTruncation = true))
.via(CirceStreamSupport.decode[ChangeMessage])

sourceUnderTest
.runWith(TestSink.probe[ChangeMessage])
.request(1)
.expectNext(ChangeMessage(List(233)))
.expectComplete()
}

这是因为从文件读取时,ByteString 元素包含多行,因此 Circe 无法解析格式错误的 json。当您用换行符分隔时,流中的每个元素都是单独的行,因此 Circe 能够使用上述功能对其进行解析。

关于json - 使用 AKKA Stream 解码分块 JSON,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44196744/

26 4 0