gpt4 book ai didi

xml - Spark master 不调用 Custom InputFormat

转载 作者:可可西里 更新时间:2023-11-01 14:53:39 25 4
gpt4 key购买 nike

我正在尝试探索 Apache Spark,作为其中的一部分,我想自定义 InputFormat。就我而言,我想阅读 xml文件并转换每次出现的 <text>到新记录。

我确实写了定制TextInputFormat (XMLRecordInputFormat.java) 返回自定义 **XMLRecordReader extends org.apache.hadoop.mapreduce.RecordReader**

但我不明白为什么 Spark master 不调用自定义输入格式 (XMLRecordInputFormat.class) ?由于某种原因,它继续表现得像普通的分线器。

代码如下:

import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;


import scala.Tuple2;

public class CustomizedXMLReader{

public static void main(String[] args) {
SparkConf sparkConf = new SparkConf()
.setMaster("local")
.setAppName("CustomizedXMLReader")
.set("spark.executor.memory", "512m").set("record.delimiter.regex", "</bermudaview>");

JobConf jobConf = new JobConf(new Configuration(), CustomizedXMLReader.class);
jobConf.setInputFormat(XMLRecordInputFormat.class);
FileInputFormat.setInputPaths(jobConf, new Path(args[0]));


JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaPairRDD<LongWritable, Text> lines = ctx.hadoopRDD(jobConf,XMLRecordInputFormat.class, LongWritable.class, Text.class);



Function<Tuple2<LongWritable, Text>, XMLRecord> keyData =
new Function<Tuple2<LongWritable, Text>, XMLRecord>() {
@Override
public XMLRecord call(Tuple2<LongWritable, Text> arg0)
throws Exception {
// TODO Auto-generated method stub
System.out.println(arg0.toString());
XMLRecord record = new XMLRecord();
record.setPos(Long.getLong(arg0._1.toString()));
record.setXml(arg0._2.toString());
return record;
}
};

JavaRDD<XMLRecord> words = lines.map(keyData);

List<XMLRecord> tupleList = words.collect();

Iterator<XMLRecord> itr = tupleList.iterator();

while(itr.hasNext()){
XMLRecord t = itr.next();
System.out.println(t.getXml());
System.out.println(t.getPos());
}
}
}

//following custom InputFormat implementation

public class XMLRecordInputFormat extends TextInputFormat{

public RecordReader<LongWritable, Text> createRecordReader(
InputSplit arg0, JobConf arg1, Reporter arg2) throws IOException {
// TODO Auto-generated method stub
XMLRecordReader r = new XMLRecordReader();

return r;
}


}

最佳答案

我想我想出了办法。

我对 API 以及 org.apache.hadoop.mapred.RecordReader(接口(interface))和 org.apache.hadoop.mapreduce.RecordReader(类)感到困惑。还有要使用的 InputFormat。

看起来 FileInputFormat 和 org.apache.hadoop.mapred.RecordReader 确实是齐头并进的。请找到将 XML 解析为 JavaRDD 的完整代码。

在这个例子中,我希望解析和提取 XML 标签....

主类

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;


import scala.Tuple2;

public class CustomizedXMLReader implements Serializable{

private static final long serialVersionUID = 1L;


public static void main(String[] args) {
CustomizedXMLReader reader = new CustomizedXMLReader();
reader.readUsingFileInputFormat(args);
}



/**
* Doing all reading using org.apache.hadoop.mapred.RecordReader interface. This is doing good.
* @param args
*/
public void readUsingFileInputFormat(String[] args){
SparkConf sparkConf = new SparkConf()
.setMaster("local")
.setAppName("CustomizedXMLReader")
.set("spark.executor.memory", "512m").set("record.delimiter.regex", "</name>");

JobConf jobConf = new JobConf(new Configuration(), CustomizedXMLReader.class);
jobConf.setInputFormat(XMLRecordFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, new Path(args[0]));


JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaPairRDD<Text, Text> lines = ctx.hadoopRDD(jobConf,XMLRecordFileInputFormat.class, Text.class, Text.class);



Function<Tuple2<Text, Text>, XMLRecord> keyData =
new Function<Tuple2<Text, Text>, XMLRecord>() {
private static final long serialVersionUID = 1L;

@Override
public XMLRecord call(Tuple2<Text, Text> arg0)
throws Exception {
System.out.println(arg0.toString());
XMLRecord record = new XMLRecord();
record.setPos(arg0._1.toString());
record.setXml(arg0._2.toString());
return record;
}
};

JavaRDD<XMLRecord> words = lines.map(keyData);

List<XMLRecord> tupleList = words.collect();

Iterator<XMLRecord> itr = tupleList.iterator();

while(itr.hasNext()){
XMLRecord t = itr.next();
System.out.println(t.getXml());
System.out.println(t.getPos());


}
}
}

记录器

import java.io.IOException;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.streaming.StreamXmlRecordReader;

public class XMLInterfaceRecordReader implements org.apache.hadoop.mapred.RecordReader<Text,Text>{

private StreamXmlRecordReader in;
private String delimiterRegex;
private long start;
private long pos;
private long end;
private static Long keyInt = 0L;


public XMLInterfaceRecordReader(InputSplit split, JobConf arg1, Reporter rep) throws IOException {
super();
FileSplit fSplit = (FileSplit) split;

this.delimiterRegex = "</name>";

start = fSplit.getStart();
end = start + fSplit.getLength();
arg1.set("stream.recordreader.begin", "<name>");
arg1.set("stream.recordreader.end", delimiterRegex);


final Path file = fSplit.getPath();
FileSystem fs = file.getFileSystem(arg1);
FSDataInputStream fileIn = fs.open(fSplit.getPath());

boolean skipFirstLine = false;
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}

in = new StreamXmlRecordReader(fileIn, fSplit,rep, arg1,fs);

this.pos = start;
}


@Override
public void close() throws IOException {
if (in != null) {
in.close();
}
}

@Override
public Text createKey() {
return new Text();
}

@Override
public Text createValue() {
return new Text();
}

@Override
public long getPos() throws IOException {
return pos;
}

@Override
public float getProgress() throws IOException {
if (start == end) {
return (long) 0.0f;
}
else {
return (long) Math.min(1.0f, (pos - start) / (float)(end - start));
}
}

@Override
public boolean next(Text Key, Text Value) throws IOException {
in.seekNextRecordBoundary();
Text key = new Text();
Text val = new Text();
in.next(key, val);

if(key.toString() != null && key.toString().length() > 0){
System.out.println(key.toString());
System.out.println(val.toString());
start += in.getPos();
Key.set(new LongWritable(++keyInt).toString());
Value.set(key.toString());
return true;
}else
return false;
}

}

文件输入格式

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

public class XMLRecordFileInputFormat extends FileInputFormat<Text, Text>{

XMLInterfaceRecordReader reader = null;

public XMLRecordFileInputFormat(){

}

@Override
public RecordReader<Text, Text> getRecordReader(InputSplit arg0,
JobConf arg1, Reporter arg2) throws IOException {
if(reader != null)
return reader;
else
return new XMLInterfaceRecordReader(arg0,arg1,arg2);
}

}

关于xml - Spark master 不调用 Custom InputFormat,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27226553/

25 4 0
文章推荐: css - 如何让右列 (
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com