gpt4 book ai didi

scala - 使用 Spark 处理 txt 文件

转载 作者:行者123 更新时间:2023-12-02 05:21:10 25 4
gpt4 key购买 nike

我需要将文本文件读入 Spark 中的数据集[T]。该文件的格式不正确,因为它有一些空白字段,并且很难定义参数来分割字符串。我一直在尝试将数据读入 RDD,然后将其转换为案例类类型,但是,并非所有字段都被正确解析,并且出现错误:

java.lang.NumberFormatException: empty String
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842)
at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
at java.lang.Double.parseDouble(Double.java:538)
at scala.collection.immutable.StringLike.toDouble(StringLike.scala:321)
at scala.collection.immutable.StringLike.toDouble$(StringLike.scala:321)
at scala.collection.immutable.StringOps.toDouble(StringOps.scala:33)
at captify.test.spark.Stats$$anonfun$2.apply(Stats.scala:53)
at captify.test.spark.Stats$$anonfun$2.apply(Stats.scala:53)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

我该如何正确处理该文件?我的 .txt 文件如下所示(匿名随机数据,但格式相同):

NEW50752085  84.0485 -76.3851  85.1   THE NAME OF AN OBJECT                       
DEM00752631 51.9581 -85.3315 98.5 THE NAME OF AN OBJECT
KI004867205 40.8518 15.9351 276.5 THE NAME OF AN OBJECT FHG 41196

我尝试以这样的方式处理它:

    val dataRdd = spark.sparkContext
.textFile("file.txt")

val dataArray = dataRdd
.map(_.split(" "))

case class caseClass(
c1: String,
c2: Double,
c3: Double,
c4: Double,
c5: String,
c6: String,
c7: String
)

val df = dataArray
.map(record => (record(0), record(1).toDouble, record(2).toDouble, record(3).toDouble, record(4), record(5), record(6)))
.map{case (c1, c2, c3, c4, c5, c6, c7) => CaseClass(c1, c2, c3, c4, c5, c6, c7)
}.toDF()

最佳答案

我将在此答案中做出一些可能不正确的假设,但根据您提供的数据和提供的错误,我相信它们是正确的。

  • 假设 1:您的数据以空格分隔,由多个空格组成。我根据您提供的空字符串的 NumberFormatException 得出了这个假设。如果您的文件是用制表符分隔的,我们就不会遇到这种情况。
  • 假设 2(这是出于我自己的考虑,但可能不正确):每个数据元素均由相同数量的空格分隔。对于这个答案的其余部分,我将假设空格数为四个。如果情况并非如此,这将成为一个更加困难的问题。
  • 假设3:7个数据元素中只有最后2个是可选的,有时不会出现。

您的 NumberFormatException 是由您用一个空格分割引起的。假设以下行由空格分隔:

NEW50752085    84.0485    -76.3851    85.1    THE NAME OF AN OBJECT 

当您拆分为一个空格时,这一行将转换为以下数组:

Array(NEW50752085, "", "", "", 84.0485, "", "", "", -76.3851, "", "", "", 85.1, "", "", "", THE, NAME, OF, AN, OBJECT)

该数组的第二个元素是一个空字符串,是您尝试转换为 Double 的元素。这就是空字符串上出现 NumberFormatException 的原因。

.map(_.split("    "))

当您将其更改为分成 4 个空格时(根据我的假设,这可能是真的,也可能不是),您会得到以下结果:

Array(NEW50752085, 84.0485, -76.3851, 85.1, THE NAME OF AN OBJECT)

但是现在我们遇到了另一个问题 - 这只有五个元素!我们想要七个。

我们可以通过修改您以后的代码来更改此设置:

val df = dataArray.map(record => {
(record(0), record(1).toDouble, record(2).toDouble, record(3).toDouble, record(4),
if(record.size > 5) record(5) else "",
if(record.size > 6) record(6) else "")
}).map{case (c1, c2, c3, c4, c5, c6, c7) => caseClass(c1, c2, c3, c4, c5, c6, c7)}.toDF
df.show
+-----------+-------+--------+----+--------------------+---+-----+
| c1| c2| c3| c4| c5| c6| c7|
+-----------+-------+--------+----+--------------------+---+-----+
|NEW50752085|84.0485|-76.3851|85.1|THE NAME OF AN OB...| | |
|DEM00752631|51.9581|-85.3315|98.5|THE NAME OF AN OB...| | |
|KI004867205|40.8518| 15.9351|76.5|THE NAME OF AN OB...|FHG|41196|
+-----------+-------+--------+----+--------------------+---+-----+

同样,只有当所有元素都由相同数量的空格分隔时,此方法才有效。

关于scala - 使用 Spark 处理 txt 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56505019/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com