gpt4 book ai didi

java - 将 Mapper 替换为 MultithreadMapper 时,映射中的键类型不匹配

转载 作者:可可西里 更新时间:2023-11-01 14:55:15 26 4
gpt4 key购买 nike

我想为我的 MapReduce 作业实现一个 MultithreadMapper。

为此,我在工作代码中用 MultithreadMapper 替换了 Mapper。

这是我得到的异常:

java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264)

这是代码设置:

 public static void main(String[] args) {
try {
if (args.length != 2) {
System.err.println("Usage: MapReduceMain <input path> <output path>");
System.exit(123);
}
Job job = new Job();
job.setJarByClass(MapReduceMain.class);
job.setInputFormatClass(TextInputFormat.class);
FileSystem fs = FileSystem.get(URI.create(args[0]), job.getConfiguration());
FileStatus[] files = fs.listStatus(new Path(args[0]));
for(FileStatus sfs:files){
FileInputFormat.addInputPath(job, sfs.getPath());
}
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(MyMultithreadMapper.class);
job.setReducerClass(MyReducer.class);
MultithreadedMapper.setNumberOfThreads(job, MyMultithreadMapper.nThreads);

job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(MyPage.class);

job.setOutputFormatClass(SequenceFileOutputFormat.class);//write the result as sequential file

System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}

这是映射器的代码:

public class MyMultithreadMapper extends MultithreadedMapper<LongWritable, Text, IntWritable, MyPage> {

ConcurrentLinkedQueue<MyScraper> scrapers = new ConcurrentLinkedQueue<MyScraper>();

public static final int nThreads = 5;

public MyMultithreadMapper() {
for (int i = 0; i < nThreads; i++) {
scrapers.add(new MyScraper());
}
}

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
MyScraper scraper = scrapers.poll();

MyPage result = null;
for (int i = 0; i < 10; i++) {
try {
result = scraper.scrapPage(value.toString(), true);
break;
} catch (Exception e) {
e.printStackTrace();
}
}

if (result == null) {
result = new MyPage();
result.setUrl(key.toString());
}

context.write(new IntWritable(result.getUrl().hashCode()), result);

scrapers.add(scraper);
}

我到底为什么会得到这个?

最佳答案

这是必须要做的事情:

MultithreadedMapper.setMapperClass(job, MyMapper.class);

MyMapper 必须实现 map 逻辑

MultithreadMapper 必须为空

关于java - 将 Mapper 替换为 MultithreadMapper 时,映射中的键类型不匹配,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7563353/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com