gpt4 book ai didi

hadoop - SparkContext.textFile 是如何工作的?

转载 作者:可可西里 更新时间:2023-11-01 14:49:24 29 4
gpt4 key购买 nike

我试图深入理解 textFile 方法,但我认为我的缺乏 Hadoop 知识让我退缩了。让我摆出我的理解,也许你可以纠正任何不正确的地方

sc.textFile(path) 被调用时,然后使用 defaultMinPartitions,这实际上只是 math.min(taskScheduler.defaultParallelism, 2)。让我们假设我们正在使用 SparkDeploySchedulerBackend,这是

conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(),
2))

所以,现在假设默认值为 2,回到 textFile,这是传递给 HadoopRDD。真实大小在 getPartitions() 中使用确定inputFormat.getSplits(jobConf, minPartitions)。但是,据我所知,分区只是一个提示,实际上大部分都被忽略了,所以你会大概得到 block 的总数。

好的,这符合预期,但是如果不使用默认值会怎样?您提供的分区大小大于 block 大小。如果我的研究是正确的,getSplits 调用只是忽略了这个参数,然后提供的最小值不会被忽略,你仍然会得到 block 大小?

Cross posted with the spark mailing list

最佳答案

简短版本:

Split大小由mapred.min.split.sizemapreduce.input.fileinputformat.split.minsize决定,如果大于HDFS的blockSize,则内部有多个block同一个文件将合并为一个拆分。

详细版本:

我认为你理解inputFormat.getSplits之前的过程是正确的。

inputFormat.getSplits里面,更具体地说,在FileInputFormat的getSplits里面,是mapred.min.split.size或者mapreduce .input.fileinputformat.split.minsize 最终确定拆分大小。 (我不确定哪个在 Spark 中有效,我更愿意相信前一个)。

让我们看看代码:FileInputFormat from Hadoop 2.4.0

long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();

for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);

long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[] splitHosts = getSplitHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts));
bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
String[] splitHosts = getSplitHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts));
}
} else {
String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}

在for循环中,makeSplit()用于生成每个split,splitSize是有效的Split Size。用于生成 splitSize 的 computeSplitSize 函数:

protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}

因此,如果 minSplitSize > blockSize,则输出拆分实际上是同一个 HDFS 文件中多个 block 的组合,另一方面,如果 minSplitSize < blockSize,则每个拆分对应于 HDFS 的一个 block 。

关于hadoop - SparkContext.textFile 是如何工作的?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30293938/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com