gpt4 book ai didi

Hadoop ChainMapper,ChainReducer

转载 作者:可可西里 更新时间:2023-11-01 14:18:50 24 4
gpt4 key购买 nike

<分区>

我是 Hadoop 的新手,正在尝试弄清楚如何使用 ChainMapper、ChainReducer 以编程方式链接作业(多个映射器、reducer)。我找到了一些部分示例,但没有找到一个完整且有效的示例。

我目前的测试代码是

public class ChainJobs extends Configured implements Tool {

public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}

public static class Map2 extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

@Override
public void map(Text key, IntWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken().concat("Justatest"));
output.collect(word, one);
}
}
}

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

@Override
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}

@Override
public int run(String[] args) {

Configuration conf = getConf();
JobConf job = new JobConf(conf);

job.setJobName("TestforChainJobs");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

JobConf map1Conf = new JobConf(false);
ChainMapper.addMapper(job, Map.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, map1Conf);

JobConf map2Conf = new JobConf(false);
ChainMapper.addMapper(job, Map2.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, map2Conf);

JobConf reduceConf = new JobConf(false);
ChainReducer.setReducer(job, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);

JobClient.runJob(job);
return 0;

}

}

public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new ChainJobs(), args);
System.exit(res);
}

但是失败了

MapAttempt TASK_TYPE="MAP" TASKID="task_201210162337_0009_m_000000" TASK_ATTEMPT_ID="attempt_201210162337_0009_m_000000_0" TASK_STATUS="FAILED" FINISH_TIME="1350397216365" HOSTNAME="localhost\.localdomain" ERROR="java\.lang\.RuntimeException: Error in configuring object
at org\.apache\.hadoop\.util\.ReflectionUtils\.setJobConf(ReflectionUtils\.java:106)
at org\.apache\.hadoop\.util\.ReflectionUtils\.setConf(ReflectionUtils\.java:72)
at org\.apache\.hadoop\.util\.ReflectionUtils\.newInstance(ReflectionUtils\.java:130)
at org\.apache\.hadoop\.mapred\.MapTask\.runOldMapper(MapTask\.java:389)
at org\.apache\.hadoop\.mapred\.MapTask\.run(MapTask\.java:327)
at org\.apache\.hadoop\.mapred\.Child$4\.run(Child\.java:268)
at java\.security\.AccessController\.doPrivileged(Native Method)
at javax\.security\.auth\.Subject\.doAs(Subject\.java:396)

非常感谢任何提示或非常简单的工作示例。

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com