gpt4 book ai didi

java - SparkContext.wholeTextFiles之后如何单独处理多个文件?

转载 作者:太空宇宙 更新时间:2023-11-04 11:20:43 26 4
gpt4 key购买 nike

我正在尝试使用wholeTextFiles读取文件夹中的所有文件名并单独处理它们(例如,我正在尝试获取每个数据集的SVD vector ,总共有100组)。数据保存在按空格分割并排列在不同行(如矩阵)中的.txt文件中。

我遇到的问题是,在我使用“wholeTextFiles(“包含所有文本文件的路径”)”之后,读取和解析数据真的很困难,而且我无法使用像只读取一个文件时使用的方法。当我只读取一个文件时,该方法工作正常,并且它给了我正确的输出。有人可以让我知道如何在这里修复它吗?谢谢!

public static void main (String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("whole text files").setMaster("local[2]").set("spark.executor.memory","1g");;
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaPairRDD<String, String> fileNameContentsRDD = jsc.wholeTextFiles("/Users/peng/FMRITest/regionOutput/");

JavaRDD<String[]> lineCounts = fileNameContentsRDD.map(new Function<Tuple2<String, String>, String[]>() {
@Override
public String[] call(Tuple2<String, String> fileNameContent) throws Exception {
String content = fileNameContent._2();
String[] sarray = content .split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i< sarray.length; i++){
values[i] = Double.parseDouble(sarray[i]);
}


pd.cache();
RowMatrix mat = new RowMatrix(pd.rdd());

SingularValueDecomposition<RowMatrix, Matrix> svd = mat.computeSVD(84, true, 1.0E-9d);
Vector s = svd.s();
}});

最佳答案

引用SparkContext.wholeTextFiles的scaladoc :

wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)] Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

换句话说,wholeTextFiles 可能不仅仅是您想要的。

由于设计上“首选小文件”(请参阅​​ scaladoc),您可以使用 mapPartitionscollect(使用 filter)来获取要应用解析的文件子集。

一旦您掌握了每个分区的文件,您就可以使用 Scala 的 Parallel Collection APIschedule Spark jobs to execute in parallel :

Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users).

By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is divided into “stages” (e.g. map and reduce phases), and the first job gets priority on all available resources while its stages have tasks to launch, then the second job gets priority, etc. If the jobs at the head of the queue don’t need to use the whole cluster, later jobs can start to run right away, but if the jobs at the head of the queue are large, then later jobs may be delayed significantly.

关于java - SparkContext.wholeTextFiles之后如何单独处理多个文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44935975/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com