gpt4 book ai didi

hadoop - hadoop CustomInputFormat不被调用

转载 作者:行者123 更新时间:2023-12-02 21:51:31 24 4
gpt4 key购买 nike

我已经编写了一个自定义输入格式,并在作业中对其进行了配置。仍然没有调用该inputformat。在运行代码时,我保留了一些要打印的SOP,但没有一个正在打印。即使我在驱动程序类中注释了自定义输入格式,输出仍然保持不变。我在哪里想念?

驾驶舱

public class TestDriver {

public static void main(String args[]) throws IOException, InterruptedException, ClassNotFoundException{

Configuration conf = new Configuration();
Job job = new Job(conf,"Custom Format");
job.setMapperClass(CustomInputFormatmapper.class);
job.setReducerClass(CustomInputFormatReducer.class);
job.setInputFormatClass(CustomInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.getConfiguration().set("fs.file.impl", "com.learn.WinLocalFileSystem");
String inputPath="In\\VISA_Details.csv";
Path inPath=new Path(inputPath);
String outputPath = "C:\\Users\\Desktop\\Hadoop learning\\output\\run1";
Path outPath=new Path(outputPath);

FileInputFormat.setInputPaths(job, inPath );
FileOutputFormat.setOutputPath(job, outPath);

System.out.println(job.waitForCompletion(true));


}
}

定制输入
    import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class CustomInputFormat extends TextInputFormat{

public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)
{
System.out.println(" ------------ INSIDE createRecordReader()--------------");
return new CustomRecordReader();
}
}

定制记录器
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;

public class CustomRecordReader extends RecordReader {

private CompressionCodecFactory compressionCodecs;
private final int NLINESTOPROCESS = 3;
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private LongWritable key;
private Text value;

@Override
public void close() throws IOException {
// TODO Auto-generated method stub

}

@Override
public Object getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return null;
}

@Override
public Object getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return null;
}

@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return 0;
}

@Override
public void initialize(InputSplit inputsplit,TaskAttemptContext taskattemptcontext)
throws IOException, InterruptedException {
System.out.println(" ---------- INSIDE INITILISE: THIS IS NOT PRINTING----------");
FileSplit split = (FileSplit)inputsplit;
Configuration job = taskattemptcontext.getConfiguration();
maxLineLength = job.getInt("mapred.linerecordreader.maxlength", 2147483647);
start = split.getStart();
end = start + split.getLength();
Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
CompressionCodec codec = compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
if(codec != null)
{
in = new LineReader(codec.createInputStream(fileIn), job);
end = 9223372036854775807L;
} else
{
if(start != 0L)
{
skipFirstLine = true;
start--;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if(skipFirstLine)
start += in.readLine(new Text(), 0, (int)Math.min(2147483647L, end - start));
pos = start;

}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {

System.out.println(" ---------- INSIDE nextKeyValue()------------");
if(key==null){
key = new LongWritable();
}
if(value==null){
value = new Text();
}
key.set(pos);
value.clear();

final Text newLine = new Text("\n");
Text newVal = new Text();
int newSize = 0;

for(int i =0;i<NLINESTOPROCESS;i++){
Text v = new Text();

while(pos<end){
newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength));
value.append(v.getBytes(),0, v.getLength());
value.append(newLine.getBytes(),0, newLine.getLength());

if (newSize == 0) {
break;
}
pos += newSize;
if (newSize < maxLineLength) {
break;
}

}
}


return false;
}

}

映射类
    import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class CustomInputFormatmapper extends Mapper<LongWritable, Text, LongWritable, LongWritable> {

public void map(LongWritable key, Text val, Context context)throws IOException, InterruptedException{

String value = val.toString();
String[] totalRows = value.split("\n");
int count =totalRows.length;

context.write(new LongWritable(Long.valueOf(count)), new LongWritable(1L));

}
}

reducer 类
    import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class CustomInputFormatReducer extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable> {

public void reduce(LongWritable key, Iterable<LongWritable> val, Context context) throws IOException, InterruptedException{
System.out.println(" --------REDUCER--------");
long count =0;
for(LongWritable vals: val){
count++;
}
context.write(key, new LongWritable(count));
}

}

最佳答案

我正在回答自己的问题,因为这将帮助其他人解决我遇到的问题。我要导入的程序包出现问题。
提到我犯的错误。

定制格式类(class)

1)错过了@Override注解
2)从import org.apache.hadoop.mapred.InputSplit导入,而不是从org.apache.hadoop.mapreduce.InputSplit导入;

定制记录仪

1)导入是从org.apache.hadoop.mapred。*完成的,而不是从org.apache.hadoop.mapreduce。*完成的;

关于hadoop - hadoop CustomInputFormat不被调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20418440/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com