- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我正在尝试探索 Apache Spark,作为其中的一部分,我想自定义 InputFormat。就我而言,我想阅读 xml
文件并转换每次出现的 <text>
到新记录。
我确实写了定制TextInputFormat
(XMLRecordInputFormat.java) 返回自定义 **XMLRecordReader extends org.apache.hadoop.mapreduce.RecordReader**
但我不明白为什么 Spark master 不调用自定义输入格式 (XMLRecordInputFormat.class)
?由于某种原因,它继续表现得像普通的分线器。
代码如下:
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
public class CustomizedXMLReader{
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf()
.setMaster("local")
.setAppName("CustomizedXMLReader")
.set("spark.executor.memory", "512m").set("record.delimiter.regex", "</bermudaview>");
JobConf jobConf = new JobConf(new Configuration(), CustomizedXMLReader.class);
jobConf.setInputFormat(XMLRecordInputFormat.class);
FileInputFormat.setInputPaths(jobConf, new Path(args[0]));
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaPairRDD<LongWritable, Text> lines = ctx.hadoopRDD(jobConf,XMLRecordInputFormat.class, LongWritable.class, Text.class);
Function<Tuple2<LongWritable, Text>, XMLRecord> keyData =
new Function<Tuple2<LongWritable, Text>, XMLRecord>() {
@Override
public XMLRecord call(Tuple2<LongWritable, Text> arg0)
throws Exception {
// TODO Auto-generated method stub
System.out.println(arg0.toString());
XMLRecord record = new XMLRecord();
record.setPos(Long.getLong(arg0._1.toString()));
record.setXml(arg0._2.toString());
return record;
}
};
JavaRDD<XMLRecord> words = lines.map(keyData);
List<XMLRecord> tupleList = words.collect();
Iterator<XMLRecord> itr = tupleList.iterator();
while(itr.hasNext()){
XMLRecord t = itr.next();
System.out.println(t.getXml());
System.out.println(t.getPos());
}
}
}
//following custom InputFormat implementation
public class XMLRecordInputFormat extends TextInputFormat{
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit arg0, JobConf arg1, Reporter arg2) throws IOException {
// TODO Auto-generated method stub
XMLRecordReader r = new XMLRecordReader();
return r;
}
}
最佳答案
我想我想出了办法。
我对 API 以及 org.apache.hadoop.mapred.RecordReader(接口(interface))和 org.apache.hadoop.mapreduce.RecordReader(类)感到困惑。还有要使用的 InputFormat。
看起来 FileInputFormat 和 org.apache.hadoop.mapred.RecordReader 确实是齐头并进的。请找到将 XML 解析为 JavaRDD 的完整代码。
在这个例子中,我希望解析和提取 XML 标签....
主类
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
public class CustomizedXMLReader implements Serializable{
private static final long serialVersionUID = 1L;
public static void main(String[] args) {
CustomizedXMLReader reader = new CustomizedXMLReader();
reader.readUsingFileInputFormat(args);
}
/**
* Doing all reading using org.apache.hadoop.mapred.RecordReader interface. This is doing good.
* @param args
*/
public void readUsingFileInputFormat(String[] args){
SparkConf sparkConf = new SparkConf()
.setMaster("local")
.setAppName("CustomizedXMLReader")
.set("spark.executor.memory", "512m").set("record.delimiter.regex", "</name>");
JobConf jobConf = new JobConf(new Configuration(), CustomizedXMLReader.class);
jobConf.setInputFormat(XMLRecordFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, new Path(args[0]));
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaPairRDD<Text, Text> lines = ctx.hadoopRDD(jobConf,XMLRecordFileInputFormat.class, Text.class, Text.class);
Function<Tuple2<Text, Text>, XMLRecord> keyData =
new Function<Tuple2<Text, Text>, XMLRecord>() {
private static final long serialVersionUID = 1L;
@Override
public XMLRecord call(Tuple2<Text, Text> arg0)
throws Exception {
System.out.println(arg0.toString());
XMLRecord record = new XMLRecord();
record.setPos(arg0._1.toString());
record.setXml(arg0._2.toString());
return record;
}
};
JavaRDD<XMLRecord> words = lines.map(keyData);
List<XMLRecord> tupleList = words.collect();
Iterator<XMLRecord> itr = tupleList.iterator();
while(itr.hasNext()){
XMLRecord t = itr.next();
System.out.println(t.getXml());
System.out.println(t.getPos());
}
}
}
记录器
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.streaming.StreamXmlRecordReader;
public class XMLInterfaceRecordReader implements org.apache.hadoop.mapred.RecordReader<Text,Text>{
private StreamXmlRecordReader in;
private String delimiterRegex;
private long start;
private long pos;
private long end;
private static Long keyInt = 0L;
public XMLInterfaceRecordReader(InputSplit split, JobConf arg1, Reporter rep) throws IOException {
super();
FileSplit fSplit = (FileSplit) split;
this.delimiterRegex = "</name>";
start = fSplit.getStart();
end = start + fSplit.getLength();
arg1.set("stream.recordreader.begin", "<name>");
arg1.set("stream.recordreader.end", delimiterRegex);
final Path file = fSplit.getPath();
FileSystem fs = file.getFileSystem(arg1);
FSDataInputStream fileIn = fs.open(fSplit.getPath());
boolean skipFirstLine = false;
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new StreamXmlRecordReader(fileIn, fSplit,rep, arg1,fs);
this.pos = start;
}
@Override
public void close() throws IOException {
if (in != null) {
in.close();
}
}
@Override
public Text createKey() {
return new Text();
}
@Override
public Text createValue() {
return new Text();
}
@Override
public long getPos() throws IOException {
return pos;
}
@Override
public float getProgress() throws IOException {
if (start == end) {
return (long) 0.0f;
}
else {
return (long) Math.min(1.0f, (pos - start) / (float)(end - start));
}
}
@Override
public boolean next(Text Key, Text Value) throws IOException {
in.seekNextRecordBoundary();
Text key = new Text();
Text val = new Text();
in.next(key, val);
if(key.toString() != null && key.toString().length() > 0){
System.out.println(key.toString());
System.out.println(val.toString());
start += in.getPos();
Key.set(new LongWritable(++keyInt).toString());
Value.set(key.toString());
return true;
}else
return false;
}
}
文件输入格式
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class XMLRecordFileInputFormat extends FileInputFormat<Text, Text>{
XMLInterfaceRecordReader reader = null;
public XMLRecordFileInputFormat(){
}
@Override
public RecordReader<Text, Text> getRecordReader(InputSplit arg0,
JobConf arg1, Reporter arg2) throws IOException {
if(reader != null)
return reader;
else
return new XMLInterfaceRecordReader(arg0,arg1,arg2);
}
}
关于xml - Spark master 不调用 Custom InputFormat,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27226553/
我在大学学习C++时学习了这段代码..后来我在C#中使用了同样的东西...但现在我想在Java中使用它...我在互联网上寻找类似的东西,但我什至不知道如何表达它,以便我得到正确的结果。 所以嗯,请让我
我正在我的 Ruby on Rails Controller 上运行 RSPEC 测试,这是我正在测试的 Controller 操作: Controller 代码: class Customers::
想为我选择的选项卡设置自定义背景,到目前为止,子类化是我自定义 UITAbBar/UITabBarItem 的方式。 问题是:有谁知道(或知道我在哪里可以找到)设置背景的属性是什么? 所选选项卡周围有
您好,我在 commerefacades-beans.xml 中创建了 eProductForm bean,我添加了 ProductData 的自定义属性。 然后在commercewebs
我有两个表:1. 客户2. customer_order 客户表包含客户数据(duh),customer_order 包含所有订单。我可以在 customer.id=customer_order.id
在我的 TableView 中,我有一个 NSMutableArray *currList 的数据源 - 它包含对象 Agent 的对象。我创建了自定义的 TableCell 并正确设置了所有内容。我
是否建议使用自引用泛型继承? public abstract class Entity { public Guid Id {get; set;} public int Version
我正在尝试为我的 Grafana 安装使用自定义文件 ( custom.ini )。不幸的是,这不起作用。 我做了什么: 安装了一台装有 CentOS 7 的虚拟机 添加了 Grafana Yum R
我被分配了两个给定类的作业,一个是抽象父类 Lot.java,另一个是测试类 TestLots.java。我不应该编辑其中任何一个。任务是创建Lot的两个子类,使TestLots中的错误不再是错误。
我是 Botpress 的新手。 我刚刚安装了 Botpress 的最新版本“botpress-ce-v11_0_1-win-x64”。 我浏览了文档,发现了一些关于内容类型、内容元素和内容渲染的解释
我一直在四处寻找,但我还没有找到任何东西,除了 Qt3 的旧文档和 qt 设计器的 3.x 版。 我会举个例子,并不是因为我的项目是 GPL 而不能提供代码,而是为了简单起见。 示例:您正在为您的应用
场景 我有一个自定义规则来验证订单的运费: public class OrderValidator : BaseValidator { private string CustomInfo {
我有用于身份验证的自定义拦截器: @Named("authInterceptor") @Provides @Singleton fun providesAuthIntercep
如果有人没有添加照片,我想显示默认头像图像。我假设我需要在模型或助手中执行自定义 getter。 如果我做 getter,它会看起来像这样吗: def avatar_url "default_ur
我正在使用 Google Search API,但遇到了一些麻烦。这个请求(在 Python 中,使用 requests 库)工作正常 res = requests.get("https://www.
我使用 MSKLC 制作了自定义键盘布局。 我以为我仔细按照说明操作了chose appropriate values对于LOCALENAME和 LOCALID参数。 但是,在通过按 Win+Spac
我正在使用 simpleframework解析 XML 字符串并将其转换为对象。 Serializer serializer = new Persister(); try { Customer
我正在使用 C# 控制台应用程序从 MySql 数据库获取一些数据,但在正确查询时遇到一些问题 现在的情况: SELECT * FROM Customer WHERE EXISTS ( SELECT
我在我的 iPhone 4S 上运行我的应用程序,我正在使用自定义表格 View Controller 和自定义表格 View 单元格,当我将表格 View 向上滑动到空白区域并同样向下滑动到空白区域
我有一个自定义的 JavaScript 变量,它正在检查 eventAction 是什么,这样我就可以知道是否触发一些转换像素。自定义 Javascript 称为“FacebookConversion
我是一名优秀的程序员,十分优秀!