gpt4 book ai didi

Scala Spark 循环没有任何错误,但不产生输出

转载 作者:可可西里 更新时间:2023-11-01 16:38:17 28 4
gpt4 key购买 nike

我在 HDFS 中有一个文件,其中包含各种其他文件的路径。这是名为 file1 的文件:

path/of/HDFS/fileA
path/of/HDFS/fileB
path/of/HDFS/fileC
.
.
.

我在 Scala Spark 中使用 for 循环如下读取上述文件的每一行并在另一个函数中处理它:

val lines=Source.fromFile("path/to/file1.txt").getLines.toList

for(i<-lines){
i.toString()
val firstLines=sc.hadoopFile(i,classOf[TextInputFormat],classOf[LongWritable],classOf[Text]).flatMap {
case (k, v) => if (k.get == 0) Seq(v.toString) else Seq.empty[String]
}
}

当我运行上面的循环时,它运行完没有返回任何错误,我在新的一行中得到 Scala 提示:scala>

但是,当我试图查看应该存储在 firstLines 中的几行输出时,它不起作用:

scala> firstLines
<console>:38: error: not found: value firstLines
firstLine
^

上面的循环没有产生输出,但是运行没有任何错误,这是什么问题?

附加信息函数 hadoopFile 接受一个字符串路径名作为它的第一个参数。这就是为什么我试图在第一个参数 i 中将 file1 的每一行(每一行都是一个路径名)作为字符串传递。 flatMap 功能获取已传递给 hadoopFile 的文件的第一行并单独存储并转储所有其他行。因此,所需的输出 (firstLines) 应该是通过路径名 (i) 传递给 hadoopFile 的所有文件的第一行。

我尝试只为一个文件运行该函数,没有循环,并产生输出:

val firstLines=sc.hadoopFile("path/of/HDFS/fileA",classOf[TextInputFormat],classOf[LongWritable],classOf[Text]).flatMap {
case (k, v) => if (k.get == 0) Seq(v.toString) else Seq.empty[String]
}

scala> firstLines.take(3)
res27: Array[String] = Array(<?xml version="1.0" encoding="utf-8"?>)

fileA 是一个 XML 文件,因此您可以看到该文件的结果第一行。所以我知道这个函数工作正常,这只是我无法弄清楚的循环问题。请帮忙。

最佳答案

变量 firstLines 定义在 for 循环体中,因此它的范围仅限于此循环。这意味着您无法在循环外访问变量,这就是 Scala 编译器告诉您 error: not found: value firstLines 的原因。

根据您的描述,我了解到您想收集 lines 中列出的每个文件的第一行。

这里的 every 可以在 Scala 中转换为不同的结构。我们可以使用类似于您编写的 for 循环的东西,或者甚至更好地采用函数式方法并使用应用于文件列表的 map 函数。在下面的代码中,我在 map 中放入了您在描述中使用的代码,它创建了一个 HadoopRDD 并将 flatMap 与您的函数一起应用以检索文件的第一行。

然后我们获得一个 RDD[String] 行列表。在这个阶段,请注意我们还没有开始做任何实际的工作。要触发 RDD 的评估并收集结果,我们需要为我们列表中的每个 RDD 添加调用 collect 方法。

// Renamed "lines" to "files" as it is more explicit.  
val fileNames = Source.fromFile("path/to/file1.txt").getLines.toList

val firstLinesRDDs = fileNames.map(sc.hadoopFile(_,classOf[TextInputFormat],classOf[LongWritable],classOf[Text]).flatMap {
case (k, v) => if (k.get == 0) Seq(v.toString) else Seq.empty[String]
})

// firstLinesRDDs is a list of RDD[String]. Based on this code, each RDD
// should consist in a single String value. We collect them using RDD#collect:
val firstLines = firstLinesRDDs.map(_.collect)

但是,这种方法存在一个缺陷,使我们无法从 Spark 提供的任何优势中受益。

当我们将 map 中的操作应用于 filenames 时,我们并没有使用 RDD,因此文件名在驱动程序(托管进程您的 Spark session )而不是可并行化 Spark 作业的一部分。这相当于执行您在第二个代码块中编写的操作,一次一个文件名。

为了解决这个问题,我们能做些什么?使用 Spark 时要牢记的一件好事是尝试在我们的代码中尽早推送 RDD 的声明。为什么?因为这样可以让 Spark 并行化优化我们要做的工作。您的示例可能是这个概念的教科书说明,尽管这里因操作文件的要求而增加了额外的复杂性。

在我们目前的案例中,我们可以受益于 hadoopFile 在输入中接受以逗号分隔的文件这一事实。因此,我们不是为每个文件依次创建 RDD,而是为所有文件创建一个 RDD:

val firstLinesRDD = sc.hadoopFile(fileNames.mkString(","), classOf[TextInputFormat],classOf[LongWritable],classOf[Text]).flatMap {
case (k, v) => if (k.get == 0) Seq(v.toString) else Seq.empty[String]
}

然后我们用一个 collect 检索我们的第一行:

val firstLines = firstLinesRDD.collect

关于Scala Spark 循环没有任何错误,但不产生输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47189433/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com