gpt4 book ai didi

Hadoop:实现自定义 FileInputFormat 类时需要帮助

转载 作者:可可西里 更新时间:2023-11-01 14:22:13 26 4
gpt4 key购买 nike

我正在尝试使用 hadoop 为大学作业实现一些 Map/Reduce 作业。但目前我在实现自定义 FileInputFormat 类以将文件中的全部内容放入我的映射器时完全陷入困境。

我从“hadoop:权威指南”中提取了这个例子,没有做任何改动。我可以编译我的源代码,但如果我运行它,它会抛出这个异常(目前我在 debian 5.0 上使用 hadoop 1.0.2)

Exception in thread "main" java.lang.RuntimeException: java.lang.NoSuchMethodException: org.myorg.ExampleFileInputFormat$WholeFileInputFormat.<init>()
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115)
at org.apache.hadoop.mapred.JobConf.getInputFormat(JobConf.java:575)
at org.apache.hadoop.mapred.JobClient.writeOldSplits(JobClient.java:989)
at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:981)
at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:174)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:897)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850)
at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:824)
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1261)
at org.myorg.ExampleFileInputFormat.run(ExampleFileInputFormat.java:163)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.myorg.ExampleFileInputFormat.main(ExampleFileInputFormat.java:172)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: java.lang.NoSuchMethodException: org.myorg.ExampleFileInputFormat$WholeFileInputFormat.<init>()
at java.lang.Class.getConstructor0(Class.java:2706)
at java.lang.Class.getDeclaredConstructor(Class.java:1985)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:109)
... 21 more

我有点沮丧,因为我不明白发生了什么,而且我没有使用网络搜索找到任何东西。也许你们中的一些人可以看看我的来源。出于调试目的,目前已将其精简。


package org.myorg;
/*
*
*
*/
import java.io.IOException;
import java.util.*;
import java.io.*;
import java.util.regex.Pattern;
import java.util.regex.Matcher;

import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.util.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configured;


public class ExampleFileInputFormat extends Configured implements Tool {

/*
* <generics>
*/
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {

@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return false;
}

@Override
public RecordReader<NullWritable, BytesWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
return new WholeFileRecordReader((FileSplit) split, job);
}
}

public class WholeFileRecordReader implements RecordReader<NullWritable, BytesWritable> {

private FileSplit fileSplit;
private Configuration conf;
private boolean processed = false;

public WholeFileRecordReader(FileSplit fileSplit, Configuration conf) throws IOException {
this.fileSplit = fileSplit;
this.conf = conf;
}

@Override
public NullWritable createKey() {
return NullWritable.get();
}

@Override
public BytesWritable createValue() {
return new BytesWritable();
}

@Override
public long getPos() throws IOException {
return processed ? fileSplit.getLength() : 0;
}

@Override
public float getProgress() throws IOException {
return processed ? 1.0f : 0.0f;
}

@Override
public boolean next(NullWritable key, BytesWritable value) throws IOException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}

@Override
public void close() throws IOException {
// do nothing
}
}
/* </generics> */


/*
* <Task1>:
* */
public static class ExampleMap extends MapReduceBase implements Mapper<NullWritable, BytesWritable, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(NullWritable key, BytesWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
output.collect(new Text("test"), one);
}
}
public static class ExampleReduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
/* </Task1> */
/*
* <run>
**/
public int run(String[] args) throws Exception {

if (args.length != 3) {
printUsage();
return 1;
}

String useCase = args[0];
String inputPath = args[1];
String outputPath = args[2];

deleteOldOutput(outputPath);


JobConf conf = new JobConf(ExampleFileInputFormat.class);
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
FileInputFormat.setInputPaths(conf, new Path(inputPath));

/* conf: Task1 */
if (useCase.equals("cc_p")) {
conf.setJobName("WordCount");
/* Output: Key:Text -> Value:Integer */
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setOutputFormat(TextOutputFormat.class);
/* Input: Key.Text -> Value:Text */
conf.setInputFormat(WholeFileInputFormat.class);
conf.setMapperClass(ExampleMap.class);
conf.setReducerClass(ExampleReduce.class);
}
/* default-option: Exit */
else {
printUsage();
return 1;
}

JobClient.runJob(conf);
return 0;
}
/* </run> */

/*
* <Main>
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new ExampleFileInputFormat(), args);
System.exit(res);
}
/* </Main> */

/*
* <Helper>
*/
private void printUsage() {
System.out.println("usage: [usecase] [input-path] [output-path]");
return;
}

private void deleteOldOutput(String outputPath) throws IOException {
// Delete the output directory if it exists already
Path outputDir = new Path(outputPath);
FileSystem.get(getConf()).delete(outputDir, true);
}
/* </Helper-> */
}

谁能帮帮我?

来自德国的问候,亚历克斯

最佳答案

你需要使内部类静态:

public static class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {

否则 Javac 会生成一个构造函数,该构造函数需要传入一个父 ExampleFileInputFormat 类实例。

关于Hadoop:实现自定义 FileInputFormat 类时需要帮助,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11457700/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com