gpt4 book ai didi

java - Hadoop:如何获取 CombineFileInputFormat 中的每个文件路径?

转载 作者:可可西里 更新时间:2023-11-01 15:03:05 25 4
gpt4 key购买 nike

我有很多文件,其中一些很小。为了减少映射器的数量,我想使用 CombineFileInputFormat。文件名将用作映射器输出的键的一部分。

我尝试了如下几种方法来获取CombineFileSplit中每个chunk的文件名,但都失败了。

1) 我在函数中看到 conf.set("map.input.file", split.getPath(idx).toString());

initNextRecordReader()CombineFileRecordReader。但是 NullPointerException

发生在我的 map() 函数中,如 context.getConfiguration().get("map.input.file")

返回 null

2) 我也在映射器中尝试 ((FileSplit) (context.getInputSplit())).getPath().getName(),但是 java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.CombineFileSplit cannot be cast to org.apache.hadoop.mapreduce.lib.input.FileSplit 发生了。

那么如何获取 CombineFileSplit 中的每个文件名?

============================================= =============

输入文件是 lzo 压缩的,暂时没有索引。

以下是我的代码:

我这样实现 CombineFileInputFormat:

public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {

@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit arg0,
TaskAttemptContext arg1) throws IOException {
// TODO Auto-generated method stub
return new CombineFileRecordReader<LongWritable, Text>((CombineFileSplit) arg0, arg1, CombineLzoLineRecordReader.class);
}

}

这是扩展 LzoLineRecordReader 的 CombineLzoLineRecordReader:

public class CombineLzoLineRecordReader extends LzoLineRecordReader {
private int index;

public CombineLzoLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index)
throws IOException, InterruptedException {
this.index = index;
}

public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
CombineFileSplit combineSplit = (CombineFileSplit) genericSplit;
FileSplit fileSplit = new FileSplit(combineSplit.getPath(index), combineSplit.getOffset(index), combineSplit.getLength(index), combineSplit.getLocations());
super.initialize(fileSplit, context);
}
}

我的 map 方法是这样的:

private String getName(String filePath) {
String[] filePathDir = filePath.split("/");
return filePathDir[filePathDir.length - 1];
}

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String name = getName(context.getConfiguration().get("map.input.file"));

line = new String(value.getBytes(), 0, value.getLength(), "ISO-8859-1");
lineFields = line.split("\t",-1);
if (lineFields != null && lineFields.length >= 20) {
// do something ...
}
}

错误信息:

13/06/14 17:02:50 INFO mapred.JobClient: Task Id : attempt_201209101415_762760_m_000000_0, Status : FAILED
java.lang.NullPointerException
at com.netease.devilfish.hadoop.job.LogAnalysisDailyMapper.getName(Unknown Source)

最佳答案

CombineFileRecordReader 在任务上下文而不是映射器上下文中设置输入路径。它们是两个不同的上下文对象,因此是第一个错误。我一直面临同样的问题,这就是我解决的方法

由于 CombineLzoLineRecordReader 和 mapper 类将在同一个 jvm 中运行,您可以通过静态变量共享数据。如下修改 CombineLzoLineRecordReader 类(星号中的更改)

public class CombineLzoLineRecordReader extends LzoLineRecordReader {
private int index;
**private static String currentPath**;

public CombineLzoLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index)
throws IOException, InterruptedException {
this.index = index;
}

public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
CombineFileSplit combineSplit = (CombineFileSplit) genericSplit;
FileSplit fileSplit = new FileSplit(combineSplit.getPath(index), combineSplit.getOffset(index), combineSplit.getLength(index), combineSplit.getLocations());
**currentPath = fileSplit.getPath().toString();**
super.initialize(fileSplit, context);
}

public static String getCurrentFilePath() {
return currentFilePath;
}
}

在您的映射器代码中使用 CombineLzoLineRecordReader.getCurrentFilePath() 来获取文件路径。

关于java - Hadoop:如何获取 CombineFileInputFormat 中的每个文件路径?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17105173/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com