gpt4 book ai didi

Scala Spark 行级错误处理

转载 作者:行者123 更新时间:2023-12-01 16:30:35 24 4
gpt4 key购买 nike

我在弄清楚如何使用 Scala Spark 程序进行一些行级错误处理时遇到了一些麻烦。在下面的代码中,我正在读取 CSV 文本文件,解析它,并使用 mapSchema 方法创建行(未显示;基本上,它采用从 CSV 生成的字符串数组,并使用模式将字符串转换为整数、 double 、日期等)。当数据的格式都正确时,它会非常有效。但是,如果我有一个坏行——例如,其中的字段少于预期——我想执行一些错误处理。

val rddFull = sqlContext.sparkContext.textFile(csvPath).map {
case(txt) =>
try {
val reader = new CSVReader(new StringReader(txt), delimiter, quote, escape, headerLines)
val parsedRow = reader.readNext()
Row(mapSchema(parsedRow, schema) : _*)
} catch {
case err: Throwable =>
println("a record had an error: "+ txt)
throw new RuntimeException("SomeError")
}

问题是 try/catch 表达式似乎不起作用。当我给它坏行时,我永远不会得到“SomeError”RuntimeException。相反,我得到了与不使用 try/catch 时得到的相同的错误。

关于这里可能出什么问题有什么想法吗?

最佳答案

您需要在正确的位置查找日志。首先:捕获确实有效。以下是 Spark-shell 的示例:

val d = sc.parallelize(0 until 10)
val e = d.map{ n =>
try {
if (n % 3==0) throw new IllegalArgumentException("That was a bad call")
println(n)
} catch {
case e: IllegalArgumentException => throw new UnsupportedOperationException("converted from Arg to Op except")
}
}
e.collect

结果如下:注意异常已被正确捕获并转换:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in
stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in
stage 0.0 (TID 5, localhost):
java.lang.UnsupportedOperationException: converted from Arg to Op except
at $anonfun$1.apply$mcVI$sp(<console>:29)
at $anonfun$1.apply(<console>:24)
at $anonfun$1.apply(<console>:24)

尝试查看一个或多个工作线程的 stderr 日志。

关于Scala Spark 行级错误处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31888367/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com