gpt4 book ai didi

apache-spark - 在 spark java 中读取带有 corrupt_record 的 json 文件

转载 作者:行者123 更新时间:2023-12-05 04:20:24 24 4
gpt4 key购买 nike

我正在使用 spark 2.7 版的 spark java 应用程序。我正在尝试根据我的模式加载可能包含损坏记录的多行 JSON 文件。我在加载它时传递了一个模式,但问题是它拒绝整个文件作为一个损坏的记录,即使有一个 JSON 对象不满足我提供的模式。

我的 Json 文件看起来像-

[
{Json_object},
{Json_object},
{Json_object}
]

我为它手动创建了(StructType 的)架构并加载它 -

Dataset<Row> df = spark.read().option("multiline", "true").option("mode","PERMISSIVE").option("columnNameOfCorruptRecord","_corrupt_record").schema(schema).json("filepath");

问题是,即使一个 JSON 对象不遵循模式,例如,如果我的模式中的 attribute1 具有整数类型,并且它是 json 对象之一的字符串形式,那么 json 对象应该进入 corrupted_record,我得到了类似的东西-

+------------+---------------+---------------+
| attribute1 | attribute2 |_corrupt_record|
+------------+---------------+---------------+
| null | null | [{|
| | | all_json_obj |
| | | ... |
| | | }] |
+------------+---------------+---------------+

它在典型的单行 json 对象中工作得非常好,其中换行符 '\n' 被用作分隔符,没有遇到任何问题和理想的结果。有人可以告诉我我在这里错过了什么吗?

PS:问题不限于spark java,scala和python的行为也是一样的。

最佳答案

恐怕这行不通,至少对于当前版本的 Spark 是行不通的。

我不是 Spark 的提交者,但我进行了调查,这就是我的发现。我不确定这是否 100% 正确,但也许对您有用(至少作为进一步调查的良好起点)

我深入研究了 Spark 代码,发现多行文件和标准文件之间存在很大差异:

  • 将 multiline 设置为 false Spark 正在使用 TextInputJsonDataSource 读取此文件,在这里您可以在代码中看到读取操作的样子 Spark Source Code :

    override def readFile(
    conf: Configuration,
    file: PartitionedFile,
    parser: JacksonParser,
    schema: StructType): Iterator[InternalRow] = {
    val linesReader = new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead, conf)
    Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => linesReader.close()))
    val textParser = parser.options.encoding
    .map(enc => CreateJacksonParser.text(enc, _: JsonFactory, _: Text))
    .getOrElse(CreateJacksonParser.text(_: JsonFactory, _: Text))

    val safeParser = new FailureSafeParser[Text](
    input => parser.parse(input, textParser, textToUTF8String),
    parser.options.parseMode,
    schema,
    parser.options.columnNameOfCorruptRecord)
    linesReader.flatMap(safeParser.parse)
    }

在这里我们可以看到 Spark 正在逐行读取文件,然后调用 flatMap 使用解析器处理每一行,以便以后很容易找到格式错误的记录并为它们生成_corrupt_record

当您将 multiline 选项设置为 true 时,Spark 将使用 MultiLineJsonDataSource(剧透 - 它以前称为 WholeFileJsonDataSource)。在这里你可以找到读取数据的函数:Spark source code

  override def readFile(
conf: Configuration,
file: PartitionedFile,
parser: JacksonParser,
schema: StructType): Iterator[InternalRow] = {
def partitionedFileString(ignored: Any): UTF8String = {
Utils.tryWithResource {
CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath)))
} { inputStream =>
UTF8String.fromBytes(ByteStreams.toByteArray(inputStream))
}
}
val streamParser = parser.options.encoding
.map(enc => CreateJacksonParser.inputStream(enc, _: JsonFactory, _: InputStream))
.getOrElse(CreateJacksonParser.inputStream(_: JsonFactory, _: InputStream))

val safeParser = new FailureSafeParser[InputStream](
input => parser.parse[InputStream](input, streamParser, partitionedFileString),
parser.options.parseMode,
schema,
parser.options.columnNameOfCorruptRecord)

safeParser.parse(
CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))))
}

现在让我们看一下 JsonParser 及其通用函数解析:Spark source code

  def parse[T](
record: T,
createParser: (JsonFactory, T) => JsonParser,
recordLiteral: T => UTF8String): Iterable[InternalRow] = {
try {
Utils.tryWithResource(createParser(factory, record)) { parser =>
// a null first token is equivalent to testing for input.trim.isEmpty
// but it works on any token stream and not just strings
parser.nextToken() match {
case null => None
case _ => rootConverter.apply(parser) match {
case null => throw QueryExecutionErrors.rootConverterReturnNullError()
case rows => rows.toSeq
}
}
}
} catch {
case e: SparkUpgradeException => throw e
case e @ (_: RuntimeException | _: JsonProcessingException | _: MalformedInputException) =>
// JSON parser currently doesnt support partial results for corrupted records.
// For such records, all fields other than the field configured by
// `columnNameOfCorruptRecord` are set to `null`
throw BadRecordException(() => recordLiteral(record), () => None, e)
case e: CharConversionException if options.encoding.isEmpty =>
val msg =
"""JSON parser cannot handle a character in its input.
|Specifying encoding as an input option explicitly might help to resolve the issue.
|""".stripMargin + e.getMessage
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException)
case PartialResultException(row, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
partialResult = () => Some(row),
cause)
}
}

在这里您可以看到 Json 没有生成 PartialResultException,但可能是这两个异常中的一个:JsonProcessingException |异常输入异常

由于此代码抛出此异常:BadRecordException(() => recordLiteral(record), () => None, e) where record = our stream = whole file.

此异常稍后由 FailureSafeParser 解释,它为您生成输出行,并且只是将数据复制到 _corrupt_record

总的来说,我试图在提交和 Jira 中找到信息,但我认为这个主题真是一团糟。我找到了初始提交,它使用此消息添加了此功能:

[SPARK-18352][SQL] Support parsing multiline json files

## What changes were proposed in this pull request?

If a new option `wholeFile` is set to `true` the JSON reader will parse each file (instead of a single line) as a value. This is done with Jackson streaming and it should be capable of parsing very large documents, assuming the row will fit in memory.

Because the file is not buffered in memory the corrupt record handling is also slightly different when `wholeFile` is enabled: the corrupt column will contain the filename instead of the literal JSON if there is a parsing failure. It would be easy to extend this to add the parser location (line, column and byte offsets) to the output if desired.

“如果存在解析失败,损坏的列将包含文件名而不是文字 JSON”- 看起来后来有所更改(实际上您在此列中有文字 Json),但我认为一般方法是相同的。

所以回到问题:“我想知道这是预期的行为还是只是一个错误!” - 我认为这不是错误,也不是预期的行为,而是 Jackson 解析器最初实现方式的结果,目前我们必须忍受这一点

关于apache-spark - 在 spark java 中读取带有 corrupt_record 的 json 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74489538/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com