gpt4 book ai didi

hadoop - Map reduce 复制输出文件,以便在配置单元中放入不同的表

转载 作者:可可西里 更新时间:2023-11-01 16:51:46 24 4
gpt4 key购买 nike

我在 hive 中有一个 MASTER 表和另外两个表

Master table contains

MsgId,NbOfTxs,InitgPty,PmtInf,DbtrAcct

Sub Master Table 1
MsgId,NbOfTxs,DbtrAcct

Sub Master Table 2
MsgId,NbOfTxs,InitgPty

数据是xml格式,我写了MR代码解析。我想创建不同的部分 -r 文件,以便它们将输出直接放在配置单元中的表中

我如何使用 MapReduce 将 OUTPUT 文件直接放入或加载到配置单元以加载到相应的配置单元表中,或者是否有更好的方法将这些文件放入配置单元表中

下面是我的代码

package xmlcsvMR;
import javax.xml.stream.XMLStreamConstants;//XMLInputFactory;
import java.io.*;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import javax.xml.stream.*;

public class XmlParser11
{

public static class XmlInputFormat1 extends TextInputFormat {

public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";


public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new XmlRecordReader();
}

/**
* XMLRecordReader class to read through a given xml document to output
* xml blocks as records as specified by the start tag and end tag
*
*/

public static class XmlRecordReader extends
RecordReader<LongWritable, Text> {
private byte[] startTag;
private byte[] endTag;
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();

private LongWritable key = new LongWritable();
private Text value = new Text();
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
FileSplit fileSplit = (FileSplit) split;

// open the file and seek to the start of the split
start = fileSplit.getStart();
end = start + fileSplit.getLength();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
fsin = fs.open(fileSplit.getPath());
fsin.seek(start);

}
@Override
public boolean nextKeyValue() throws IOException,
InterruptedException {
if (fsin.getPos() < end) {
if (readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
key.set(fsin.getPos());
value.set(buffer.getData(), 0,
buffer.getLength());
return true;
}
} finally {
buffer.reset();
}
}
}
return false;
}
@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
return key;
}

@Override
public Text getCurrentValue() throws IOException,
InterruptedException {
return value;
}
@Override
public void close() throws IOException {
fsin.close();
}
@Override
public float getProgress() throws IOException {
return (fsin.getPos() - start) / (float) (end - start);
}

private boolean readUntilMatch(byte[] match, boolean withinBlock)
throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
// end of file:
if (b == -1)
return false;
// save to buffer:
if (withinBlock)
buffer.write(b);
// check if we're matching:
if (b == match[i]) {
i++;
if (i >= match.length)
return true;
} else
i = 0;
// see if we've passed the stop point:
if (!withinBlock && i == 0 && fsin.getPos() >= end)
return false;
}
}
}
}


public static class Map extends Mapper<LongWritable, Text,
Text, Text> {
@Override
protected void map(LongWritable key, Text value,
Mapper.Context context)
throws
IOException, InterruptedException {
String document = value.toString();
System.out.println("‘" + document + "‘");
try {
XMLStreamReader reader =
XMLInputFactory.newInstance().createXMLStreamReader(new
ByteArrayInputStream(document.getBytes()));
String propertyName = "";
String propertyValue = "";
String currentElement = "";
while (reader.hasNext()) {
int code = reader.next();
switch (code) {
case XMLStreamConstants.START_ELEMENT: //START_ELEMENT:
currentElement = reader.getLocalName();
break;
case XMLStreamConstants.CHARACTERS: //CHARACTERS:
if (currentElement.equalsIgnoreCase("MsgId")) {
propertyName += reader.getText();
//System.out.println(propertyName);
} else if (currentElement.equalsIgnoreCase("NbOfTxs")) {
propertyValue += reader.getText();
//System.out.println(propertyValue);
}
break;
}
}
reader.close();
context.write(new Text(propertyName.trim()), new Text(propertyValue.trim()));

}
catch(Exception e){
throw new IOException(e);

}

}
}
public static class Reduce
extends Reducer<Text, Text, Text, Text> {
private Text outputKey = new Text();
public void reduce(Text key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {

for (Text value : values) {
outputKey.set(constructPropertyXml(key, value));
context.write(outputKey, null);
}
}
public static String constructPropertyXml(Text name, Text value) {
StringBuilder sb = new StringBuilder();
sb.append("MsgID ").append(name)
.append(" NbOfTxs ").append(value);
return sb.toString();
}


}



public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();

conf.set("xmlinput.start", "<Event>");
conf.set("xmlinput.end", "</Event>");
conf.set("mapred.textoutputformat.separatorText", ",");
Job job = new Job(conf);
job.setJarByClass(XmlParser11.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setMapperClass(XmlParser11.Map.class);
job.setReducerClass(XmlParser11.Reduce.class);

job.setInputFormatClass(XmlInputFormat1.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);
}
}

最佳答案

尝试使用 MultiOutputs .您可以使用此选项写入不同的文件,因此可以制作不同的输出副本以加载到 Hive 中。

一个很好的例子是here使用 hadoop 1.0.2

下面是取自 javadocs 的示例:

Usage in Reducer:

<K, V> String generateFileName(K k, V v) {
return k.toString() + "_" + v.toString();
}

public class MOReduce extends
Reducer<WritableComparable, Writable,WritableComparable, Writable> {
private MultipleOutputs mos;
public void setup(Context context) {
...
mos = new MultipleOutputs(context);
}

public void reduce(WritableComparable key, Iterator<Writable> values,
Context context)
throws IOException {
...
mos.write("text", , key, new Text("Hello"));
mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
...
}

public void cleanup(Context) throws IOException {
mos.close();
...
}

}

关于hadoop - Map reduce 复制输出文件,以便在配置单元中放入不同的表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32440781/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com