gpt4 book ai didi

scala - 将文件名哈希附加到 Spark RDD 的每一行

转载 作者:行者123 更新时间:2023-12-01 04:42:28 25 4
gpt4 key购买 nike

我正在尝试将文件目录加载到 Spark RDD 中,我需要为每一行附加原始文件名。

我找不到使用 sc.textFile 对常规 RDD 执行此操作的方法,因此我现在尝试使用 WholeTextFiles 方法来加载每个文件及其文件名。

我正在使用这段代码:

val lines = 
sc.wholeTextFiles(logDir).flatMap{ case (filename, content) =>
val hash = modFiles.md5(filename)
content.split("\n")
.filter(line =>
<filter conditions>
.map(line => line+hash)
}

但是这段代码给我一个 Java 堆内存不足错误,我猜它是在尝试一次加载所有文件吗?

是否有一种不使用 wholeTextFiles 来解决此问题的方法和/或是否有一种方法可以不使用 wholeTextFiles 一次加载所有文件?

最佳答案

解决方法是应用本页代码:http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.HadoopRDD

// Create the text file
val text = sc.hadoopFile(logDir,
classOf[TextInputFormat], classOf[LongWritable], classOf[Text], sc.defaultMinPartitions)

// Cast to a HadoopRDD
val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]]
val linesRaw = hadoopRdd.mapPartitionsWithInputSplit { (inputSplit, iterator) ⇒
// get file name hash - you need to define your own hash function
val fileHash = hash(inputSplit.asInstanceOf[FileSplit].getPath.toString)
// input split is in _1 and line is in _2
iterator.map(splitAndLine => splitAndLine._2+fileHash)
}

与使用 sc.textFile 相比,使用此代码的性能下降了约 10%

关于scala - 将文件名哈希附加到 Spark RDD 的每一行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32835544/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com