gpt4 book ai didi

java - 运行 Hadoop MapReduce 作业时获取文件名/文件数据作为 Map 的键/值输入

转载 作者:可可西里 更新时间:2023-11-01 14:54:53 24 4
gpt4 key购买 nike

我完成了问题 How to get Filename/File Contents as key/value input for MAP when running a Hadoop MapReduce Job?这里。虽然它解释了这个概念,但我无法成功地将它转换为代码。

基本上,我希望文件名作为键,文件数据作为值。为此,我按照上述问题中的建议编写了自定义 RecordReader。但是我无法理解如何将文件名作为此类中的键。此外,在编写自定义 FileInputFormat 类时,我无法理解如何返回我之前编写的自定义 RecordReader

RecordReader代码是:

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class CustomRecordReader extends RecordReader<Text, Text> {

private static final String LINE_SEPARATOR = System.getProperty("line.separator");

private StringBuffer valueBuffer = new StringBuffer("");
private Text key = new Text();
private Text value = new Text();
private RecordReader<Text, Text> recordReader;

public SPDRecordReader(RecordReader<Text, Text> recordReader) {
this.recordReader = recordReader;
}

@Override
public void close() throws IOException {
recordReader.close();
}

@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
return recordReader.getProgress();
}

@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {
recordReader.initialize(arg0, arg1);
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {

if (valueBuffer.equals("")) {
while (recordReader.nextKeyValue()) {
valueBuffer.append(recordReader.getCurrentValue());
valueBuffer.append(LINE_SEPARATOR);
}
value.set(valueBuffer.toString());
return true;
}
return false;
}

}

不完整的 FileInputFormat 类是:

import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

public class CustomFileInputFormat extends FileInputFormat<Text, Text> {

@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return false;
}

@Override
public RecordReader<Text, Text> getRecordReader(InputSplit arg0, JobConf arg1,
Reporter arg2) throws IOException {
return null;
}
}

最佳答案

将此代码放入您的 CustomRecordReader 类中。

private LineRecordReader lineReader;

private String fileName;

public CustomRecordReader(JobConf job, FileSplit split) throws IOException {
lineReader = new LineRecordReader(job, split);
fileName = split.getPath().getName();
}

public boolean next(Text key, Text value) throws IOException {
// get the next line
if (!lineReader.next(key, value)) {
return false;
}

key.set(fileName);
value.set(value);

return true;
}

public Text createKey() {
return new Text("");
}

public Text createValue() {
return new Text("");
}

删除 SPDRecordReader 构造函数(这是一个错误)。

并将此代码放入您的 CustomFileInputFormat

public RecordReader<Text, Text> getRecordReader(
InputSplit input, JobConf job, Reporter reporter)
throws IOException {

reporter.setStatus(input.toString());
return new CustomRecordReader(job, (FileSplit)input);
}

关于java - 运行 Hadoop MapReduce 作业时获取文件名/文件数据作为 Map 的键/值输入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14212453/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com