gpt4 book ai didi

scala - 循环遍历文件行并通过 Spark 中的每次迭代执行函数

转载 作者:可可西里 更新时间:2023-11-01 15:26:09 29 4
gpt4 key购买 nike

我在 HDFS 中有一个名为 file1 的文件,其中包含以下几行:(每一行都是一个目录路径)

this/is/path1
this/is/path2
this/is/path3
.
.
.
this/is/path1000ormore

我有一个 Scala Spark 函数如下:

val resultset=sc.hadoopFile(inputpath,classOf[TextInputFormat],classOf[LongWritable],classOf[Text]).flatMap {
case (k, v) => if (k.get == 0) Seq(v.toString) else Seq.empty[String]
}

我想传递“file1”中的每一行来代替 hadoopFile 函数中的“inputpath”(需要是一个字符串),并获取每次迭代/循环的结果。我该怎么做?

额外信息:

函数的实际作用: 上面的函数从指定的目录路径中获取第一个文件来代替“inputpath”,并给出文件的第一行。我想对存储在“file1”中的所有目录路径执行此操作,因此我正在寻找有关如何在循环/迭代中执行此操作的解决方案。

更新:我试着把它放在这样的循环中:

val lines=Source.fromFile("/path/to/file1.txt").getLines.toList
for(i<-lines){
val firstLines=sc.hadoopFile(i,classOf[TextInputFormat],classOf[LongWritable],classOf[Text]).flatMap {
case (k, v) => if (k.get == 0) Seq(v.toString) else Seq.empty[String]
}

这运行了大约 10 分钟(文件 1 包含大约 34,000 行)并且没有出现任何错误。但是当我尝试使用以下命令查看几行输出时,

firstLines.take(3)

我收到一条错误消息:

error: not found: value firstLines
firstLines
^

所以我认为循环没有成功运行,因此 firstLines 从未被创建,尽管我不知道问题可能是什么。有人可以提供解决方案吗?

最佳答案

可以分两步实现:

  1. 像往常一样从 HDFS 读取“file1”文本文件,获取所有元素;
  2. 对于 1) 中的每一项,应用“结果集”逻辑。

另外 2) 可以改进:1) 中的所有项目都可以在一个字符串中用逗号连接,字符串作为“inputpath”参数传递。您将拥有一个包含所有文件数据的 RDD。可以应用过滤器“k.get == 0”以获得最终结果。

首先可以这样实现:

val lines = Source.fromFile("file1.txt").getLines.toSeq.view

val resultDF = lines.map(current =>
sc.hadoopFile(current, classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).filter(_._1 == 0)
).reduce(_ union _)

resultDF.take(3).foreach(println)

关于scala - 循环遍历文件行并通过 Spark 中的每次迭代执行函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47104704/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com