gpt4 book ai didi

hadoop - 如何使用Hadoop分布式缓存将文件放入内存?

转载 作者:可可西里 更新时间:2023-11-01 14:34:11 27 4
gpt4 key购买 nike

据我所知,分布式缓存将文件复制到每个节点,然后映射或减少从本地文件系统读取文件。

我的问题是:有没有一种方法可以使用 Hadoop 分布式缓存将文件放入内存,以便每个 map 或 reduce 都可以直接从内存中读取文件?

我的MapReduce程序向每个节点分发一张1M左右的png图片,然后每个map任务从分布式缓存中读取图片,并用来自map输入的另一张图片做一些图像处理。

最佳答案

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {

Path[] uris = DistributedCache.getLocalCacheFiles(context
.getConfiguration());





try{
BufferedReader readBuffer1 = new BufferedReader(new FileReader(uris[0].toString()));
String line;
while ((line=readBuffer1.readLine())!=null){
System.out.println(line);

}
readBuffer1.close();
}
catch (Exception e){
System.out.println(e.toString());
}

StringTokenizer itr = new StringTokenizer(value.toString());

while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
int length=key.getLength();
System.out.println("length"+length);
result.set(sum);
/* key.set("lenght"+lenght);*/
context.write(key, result);


}
}

public static void main(String[] args) throws Exception {

final String NAME_NODE = "hdfs://localhost:9000";
Configuration conf = new Configuration();

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);


DistributedCache.addCacheFile(new URI(NAME_NODE
+ "/dataset1.txt"),
job.getConfiguration());



FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

关于hadoop - 如何使用Hadoop分布式缓存将文件放入内存?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20539432/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com