gpt4 book ai didi

hadoop - hadoop设置方法映射器

转载 作者:行者123 更新时间:2023-12-02 21:51:12 25 4
gpt4 key购买 nike

我目前正在尝试学习hadoop编程,并编写在一个映射器中处理两个输入源的程序。这项工作与mapside-join问题相似。

因此,我首先使用了分布式缓存,但是它不能很好地工作。
因此,我第二次使用了setup()函数。它可以在单台PC上的本地执行模式下很好地工作,但是,在群集环境中则不能工作。

我不知道确切原因。

如果使用setup()函数,集群是否有任何配置?

以下是我的代码的一部分。这部分是体现迭代工作的作业驱动程序。

public int run(String[] arg0) throws Exception {


// TODO Auto-generated method stub

int iteration = 1;

Configuration conf = new Configuration();

Path in = new Path(arg0[0]);
Path out = new Path(arg0[1]+"iteration_"+iteration);

conf.set("conf.threshold", arg0[2]);

Job job = new Job(conf, "Test");
job.setJarByClass(getClass());
job.setMapperClass(FirstMap.class);
job.setReducerClass(FirstReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
job.waitForCompletion(true);


// start second job

// long counter = 4;//job.getCounters().findCounter(SecondReduce.Counter.CONVERGED).getValue();
String PriorPath = out.toString();

boolean Updates = true;
while (Updates) {

iteration ++;
conf = new Configuration();

Path out2 = new Path(arg0[1]+"iteration_"+iteration);

conf.set("prior.job.out", PriorPath);
conf.set("conf.iteration", iteration+"");
job = new Job(conf, "job"+iteration);
job.setJarByClass(getClass());
job.setMapperClass(SecondMap.class);
job.setReducerClass(SecondReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);


FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out2);
job.waitForCompletion(true);

PriorPath = out2.toString();

long counter = job.getCounters().findCounter(Counter.CONVERGED).getValue();
Updates = (counter > 0);
System.out.println("counter : " + counter);
}

return 0;
}

另外,包括设置功能的映射器如下。
public static class SecondMap extends
Mapper<LongWritable, Text, Text, IntWritable> {

IntWritable one = new IntWritable(1);
Vector<String> Vec = new Vector<String>();
Vector<String> Gen = new Vector<String>();
int iteration;
@Override
public void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
Path Cand = new Path(conf.get("prior.job.out"));
// iteration = Integer.parseInt(conf.get("conf.iteration"));
String iter = conf.get("conf.iteration");
iteration = Integer.parseInt(iter);

try {
FileSystem fs = FileSystem.get(conf);
FileStatus[] status = fs.listStatus(Cand);
for (int i = 0; i < status.length; i++) {
BufferedReader br = new BufferedReader(
new InputStreamReader(fs.open(status[i].getPath())));
String line;
line = br.readLine();
while (line != null) {
System.out.println(line);

Vec.add(line);
line = br.readLine();
}
}
} catch (Exception e) {
System.out.println("File not found");
}
Gen = GenerateCandidate(Vec, iteration);

}

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// something with CandGen

}
}
}

有人对此问题有经验吗?

最佳答案

每个Mapper任务或Reducer任务仅调用一次。因此,如果为一个工作生成了10个映射器或化简器,那么对于每个映射器和化简器,它将被调用一次。
在此方法中添加内容的一般准则是一次需要执行的任何任务都可以在此处编写,例如获取分布式缓存的路径,将参数传递给Mappers和Reducer。
清理方法类似。

关于hadoop - hadoop设置方法映射器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20745193/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com