- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我是 hadoop 的新手,使用 Hadoop 2.6.0 版本并尝试解析复杂的 XML。搜索了一段时间后,我了解到对于 XML 解析,我们需要编写自定义 InputFormat,它是 mahout 的 XMLInputFormat。我也得到了this example的帮助
但是当我在 passig XMLInputformat 类之后运行我的代码时,如果我使用示例中给出的 XMLInputFormat,它不会调用我自己的 Mapper 类并且输出文件中有 0 个数据。
令人惊讶的是,如果我没有将我的 XMLInputFormat 类传递给我的 JOB,那么我的映射器可以正常工作并正确提供输出。有人会在这里帮助指出我在这里缺少的东西吗?
我的作业配置类是:
public static void runParserJob(String inputPath, String outputPath) throws IOException {
LOGGER.info("-----runParserJob()-----Start");
Configuration configuration = new Configuration(); configuration.set("xmlinput.start",Constants.XML_INPUT_START_TAG_PRODUCT);
configuration.set("xmlinput.end",Constants.XML_INPUT_END_TAG_PRODUCT);
configuration.set("io.serializations",Constants.XML_IO_SERIALIZATIONS);
Job job = new Job(configuration,Constants.JOB_TITLE);
FileInputFormat.setInputPaths(job, inputPath);
job.setJarByClass(ParserDriver.class);
job.setMapperClass(XMLMapper.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(XmlInputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
Path hdfsOutputPath = new Path(outputPath);
FileOutputFormat.setOutputPath(job, hdfsOutputPath);
FileSystem dfs = FileSystem.get(hdfsOutputPath.toUri(),configuration);
/**Using this condition it will create output at same location
* by deleting older data in that location**/
if(dfs.exists(hdfsOutputPath)){
dfs.delete(hdfsOutputPath,true);
}
try{
job.waitForCompletion(true);
}catch(InterruptedException ie){
LOGGER.error("-----Process interrupted in between Exception-----", ie);
}catch(ClassNotFoundException ce){
LOGGER.error("-----Class not found while running the job-----",ce);
}
}
我的 XMLInputFormat 类是:
public class XmlInputFormat extends TextInputFormat{
public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";
@Override
public RecordReader<LongWritable,Text> createRecordReader(InputSplit is, TaskAttemptContext tac) {
return new XmlRecordReader();
}
public static class XmlRecordReader extends RecordReader<LongWritable, Text>{
private byte[] startTag;
private byte[] endTag;
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
private LongWritable key = new LongWritable();
private Text value = new Text();
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit)inputSplit;
startTag = taskAttemptContext.getConfiguration().get(START_TAG_KEY).getBytes("utf-8");
endTag = taskAttemptContext.getConfiguration().get(END_TAG_KEY).getBytes("utf-8");
start = fileSplit.getStart();
end = start + fileSplit.getLength();
Path file = fileSplit.getPath();
FileSystem hdfs = file.getFileSystem(taskAttemptContext.getConfiguration());
fsin = hdfs.open(fileSplit.getPath());
fsin.seek(start);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if(fsin.getPos() < end){
if(readUntilMatch(startTag,false)){
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
value.set(buffer.getData(), 0, buffer.getLength());
key.set(fsin.getPos());
return true;
}
} finally {
buffer.reset();
}
}
}
return false;
}
@Override
public void close() throws IOException {
}
@Override
public LongWritable getCurrentKey() throws IOException,InterruptedException {
return null;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return null;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException{
int i = 0;
while(true){
int b = fsin.read();
//If reaches to EOF
if(b == -1){
return false;
}
//If not then save into the buffer.
if(withinBlock){
buffer.write(b);
}
// check if we're matching:
if (b == match[i]) {
i++;
if (i >= match.length) return true;
} else i = 0;
// see if we've passed the stop point:
if (!withinBlock && i == 0 && fsin.getPos() >= end) return false;
}
}
}
有人可以帮我吗?提前致谢。如果我哪里出错了,请纠正我。
最佳答案
我不确定您的 XML 结构是什么样的,但是例如,如果您有一个 XML 结构:
<data>
<product id="101" itemCategory="BER" transaction="PUR">
<transaction-id>102A5RET</transaction-id>
<item-name>Blue-Moon-12-PK-BTTLE</item-name>
<item-purchased>2</item-purchased>
<item-price>12.99</item-price>
<time-stamp>2015-04-20 11:12:13 102301</time-stamp>
</product>
.
.
.
</data>
您的 XMLInputFormat 类需要知道您要使用哪个 XML 节点:
configuration.set("xmlinput.start", "<product") //note only <product
configuration.set("xmlinput.end", "</product>") //note only </product>
希望这会有所帮助!
关于java - 使用 XMLInputFormat 在 hadoop 中解析 xml 时不执行我的 hadoop 映射器类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30396808/
有没有类似Mahout's XmlInputFormat的东西但是对于 Flink 呢? 我有一个很大的 XML 文件,我想提取特定的元素。在我的例子中,它是一个维基百科转储,我需要得到所有 标签。
Mahout 的 XmlInputFormat 能否在不覆盖其任何方法的情况下处理 gzip 压缩数据?我一直在尝试解析经过 gzip 压缩的维基百科 xml 数据,但到目前为止都没有成功。 我听说
我正在尝试使用 Hadoop 进行 WordCount。我想使用 XmlInputFormat.class 根据 XML 标记拆分文件。 XmlInputFormat.class 是 here Xml
我想在hadoop 中解析xml 文件。它包含一个标签,如 如果我在 xmlinputformat start_tag_key 中将 article 作为我的开始标记,它无法识别 article 标
我是 Hadoop MapReduce 的新手(准确地说是 4 天),我被要求在集群上执行分布式 XML 解析。根据我在 Internet 上的(重新)搜索,使用 Mahout 的 XmlInputF
我是 hadoop 的新手,使用 Hadoop 2.6.0 版本并尝试解析复杂的 XML。搜索了一段时间后,我了解到对于 XML 解析,我们需要编写自定义 InputFormat,它是 mahout
我是一名优秀的程序员,十分优秀!