gpt4 book ai didi

java - 如何在 Hadoop 中使用 CombineFileInputFormat?

转载 作者:可可西里 更新时间:2023-11-01 14:55:10 25 4
gpt4 key购买 nike

我想使用 Hadoop 0.20.0/0.20.2 的 CombineFileInputFormat,这样它每条记录处理 1 个文件,并且不会影响数据 - 局部性(它通常会处理)。

Tom White 的 Hadoop 权威指南中提到了它,但他没有展示如何去做。相反,他继续使用序列文件。

我对记录阅读器中处理变量的含义感到很困惑。任何代码示例都会有很大的帮助。

提前致谢..

最佳答案

检查以下输入格式用于组合文件输入格式。

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;


/**
* CustomInputformat which implements the createRecordReader of abstract class CombineFileInputFormat
*/

public class MyCombineFileInputFormat extends CombineFileInputFormat {

public static class MyRecordReader extends RecordReader<LongWritable,Text>{
private LineRecordReader delegate=null;
private int idx;

public MyRecordReader(CombineFileSplit split,TaskAttemptContext taskcontext ,Integer idx) throws IOException {
this.idx=idx;
delegate = new LineRecordReader();
}

@Override
public void close() throws IOException {
delegate.close();
}

@Override
public float getProgress() {
try {
return delegate.getProgress();
}
catch(Exception e) {
return 0;
}
}

@Override
public void initialize(InputSplit split, TaskAttemptContext taskcontext) throws IOException {
CombineFileSplit csplit=(CombineFileSplit)split;
FileSplit fileSplit = new FileSplit(csplit.getPath(idx), csplit.getOffset(idx), csplit.getLength(idx), csplit.getLocations());
delegate.initialize(fileSplit, taskcontext);
}

@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
return delegate.getCurrentKey();
}


@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return delegate.getCurrentValue();
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return delegate.nextKeyValue();
}

}

@SuppressWarnings("unchecked")
@Override
public RecordReader createRecordReader(InputSplit split,TaskAttemptContext taskcontext) throws IOException {
return new CombineFileRecordReader((CombineFileSplit) split, taskcontext, MyRecordReader.class);
}
}

关于java - 如何在 Hadoop 中使用 CombineFileInputFormat?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10380200/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com