gpt4 book ai didi

java - 快速解析具有较大可跳过区域的 JSON

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:43:55 24 4
gpt4 key购买 nike

我有一个具有以下结构的 JSON(真实世界的例子在这里 https://gist.github.com/PavelPenkov/3432fe522e02aa3a8a597020d4ee7361 ):

{
"metadata": { /* Huge TYPED object */ },
"payload": { /* Small flat UNTYPED object */
"field_1": 1
"field_2": "Alice"
}
}

我想尽快提取 payload 部分,文件很大,将其解析为案例类相当慢(在我的笔记本电脑上为 5000 op/s)。到目前为止,我已经尝试过了

  1. 用 Jackson 将整个文档解析为案例类。

  2. 使用 Jackson 解析为 AST 并仅提取 payload 字段 - 速度稍快。

  3. scala-jsoniter 虽然它可能可以更快地解析类型化部分,但它无法按设计解析非类型化字段。

是否有任何其他选项可以从 Java 或(最好)Scala 访问?

最佳答案

跳过不需要的 JSON 值是 jsoniter-scala 的亮点。是的,它不为 JSON 提供 AST 模型,但您可以自己构建它或使用第三方库提供的模型。以下是 circe AST 的自定义编解码器示例:

package io.circe

import java.util

import com.github.plokhotnyuk.jsoniter_scala.core._
import io.circe.Json._

object CirceJsoniter {
implicit val codec: JsonValueCodec[Json] = new JsonValueCodec[Json] {
override def decodeValue(in: JsonReader, default: Json): Json = {
var b = in.nextToken()
if (b == 'n') in.readNullOrError(default, "expected `null` value")
else if (b == '"') {
in.rollbackToken()
new JString(in.readString(null))
} else if (b == 'f' || b == 't') {
in.rollbackToken()
if (in.readBoolean()) Json.True
else Json.False
} else if ((b >= '0' && b <= '9') || b == '-') {
new JNumber({
in.rollbackToken()
in.setMark() // TODO: add in.readNumberAsString() to Core API of jsoniter-scala
try {
do b = in.nextByte()
while (b >= '0' && b <= '9')
} catch { case _: JsonReaderException => /* ignore end of input error */} finally in.rollbackToMark()
if (b == '.' || b == 'e' || b == 'E') new JsonDouble(in.readDouble())
else new JsonLong(in.readLong())
})
} else if (b == '[') {
new JArray(if (in.isNextToken(']')) Vector.empty
else {
in.rollbackToken()
var x = new Array[Json](4)
var i = 0
do {
if (i == x.length) x = java.util.Arrays.copyOf(x, i << 1)
x(i) = decodeValue(in, default)
i += 1
} while (in.isNextToken(','))
(if (in.isCurrentToken(']'))
if (i == x.length) x
else java.util.Arrays.copyOf(x, i)
else in.arrayEndOrCommaError()).to[Vector]
})
} else if (b == '{') {
new JObject(if (in.isNextToken('}')) JsonObject.empty
else {
val x = new util.LinkedHashMap[String, Json]
in.rollbackToken()
do x.put(in.readKeyAsString(), decodeValue(in, default))
while (in.isNextToken(','))
if (!in.isCurrentToken('}')) in.objectEndOrCommaError()
JsonObject.fromLinkedHashMap(x)
})
} else in.decodeError("expected JSON value")
}

override def encodeValue(x: Json, out: JsonWriter): Unit = x match {
case JNull => out.writeNull()
case JString(s) => out.writeVal(s)
case JBoolean(b) => out.writeVal(b)
case JNumber(n) => n match {
case JsonLong(l) => out.writeVal(l)
case _ => out.writeVal(n.toDouble)
}
case JArray(a) =>
out.writeArrayStart()
a.foreach(v => encodeValue(v, out))
out.writeArrayEnd()
case JObject(o) =>
out.writeObjectStart()
o.toIterable.foreach { case (k, v) =>
out.writeKey(k)
encodeValue(v, out)
}
out.writeObjectEnd()
}

override def nullValue: Json = Json.Null
}
}

另一种选择,如果您只需要提取有效载荷值的字节,那么您可以使用这样的代码以每秒约 300000 条消息的速率为提供的示例执行此操作:

import com.github.plokhotnyuk.jsoniter_scala.core._
import com.github.plokhotnyuk.jsoniter_scala.macros._
import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations._
import scala.reflect.io.Streamable
import scala.util.hashing.MurmurHash3

case class Payload private(bs: Array[Byte]) {
def this(s: String) = this(s.getBytes(UTF_8))

override lazy val hashCode: Int = MurmurHash3.arrayHash(bs)

override def equals(obj: Any): Boolean = obj match {
case that: Payload => java.util.Arrays.equals(bs, that.bs)
case _ => false
}

override def toString: String = new String(bs, UTF_8)
}

object Payload {
def apply(s: String) = new Payload(s.getBytes)

implicit val codec: JsonValueCodec[Payload] = new JsonValueCodec[Payload] {
override def decodeValue(in: JsonReader, default: Payload): Payload = new Payload(in.readRawValAsBytes())

override def encodeValue(x: Payload, out: JsonWriter): Unit = out.writeRawVal(x.bs)

override val nullValue: Payload = new Payload(new Array[Byte](0))
}
}

case class MessageWithPayload(payload: Payload)

object MessageWithPayload {
implicit val codec: JsonValueCodec[MessageWithPayload] = JsonCodecMaker.make(CodecMakerConfig())

val jsonBytes: Array[Byte] = Streamable.bytes(getClass.getResourceAsStream("debezium.json"))
}

@State(Scope.Thread)
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1, jvmArgs = Array(
"-server",
"-Xms2g",
"-Xmx2g",
"-XX:NewSize=1g",
"-XX:MaxNewSize=1g",
"-XX:InitialCodeCacheSize=512m",
"-XX:ReservedCodeCacheSize=512m",
"-XX:+UseParallelGC",
"-XX:-UseBiasedLocking",
"-XX:+AlwaysPreTouch"
))
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
class ExtractPayloadReading {
@Benchmark
def jsoniterScala(): MessageWithPayload = readFromArray[MessageWithPayload](MessageWithPayload.jsonBytes)
}

关于java - 快速解析具有较大可跳过区域的 JSON,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56872938/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com