gpt4 book ai didi

java - 在使用 hadoop 从原始日志中捕获异常时,一个完整的异常将被拆分为 2 个映射

转载 作者:可可西里 更新时间:2023-11-01 14:22:34 25 4
gpt4 key购买 nike

我想使用 hadoop 从原始日志中获取和解析异常。我遇到一个问题,一些异常(跨越多行)将成为 2 个不同拆分的一部分,因此是 2 个不同的映射器。

我有个想法可以避免这个问题。我可以覆盖 getSplits() 方法,使每个拆分都有一点冗余数据。我认为这个解决方案对我来说成本太高。

那么对于这个问题有没有人有更好的解决方案呢?

最佳答案

我会去做一个预处理工作,在其中用XML 标签 标记异常。接下来您可以使用 XMLInputformat 来处理这些文件。 (这只是解决方案的开始,根据您的反馈,我们可能会使事情变得更具体)

This link提供编写您自己的 XMLinputformat 的教程,您可以对其进行自定义以查找“异常”特征。本教程的重点是这句话:

In the event that a record spans a InputSplit boundary, the record reader will take care of this so we will not have to worry about this.

我会复制粘贴该网站的信息,因为它可能会在将来下线,这对于将来查看此网站的人来说可能会非常令人沮丧:

输入格式:

package org.undercloud.mapreduce.example3;

import java.io.IOException;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class XmlInputFormat extends FileInputFormat {

public RecordReader getRecordReader(InputSplit input, JobConf job, Reporter reporter)
throws IOException {

reporter.setStatus(input.toString());
return new XmlRecordReader(job, (FileSplit)input);
}

记录阅读器:注意:读取拆分末尾的逻辑位于 readUntilMatch 函数中,该函数如果有开放标记,则读取拆分末尾。我认为这正是您要找的!

package org.undercloud.mapreduce.example3;

import java.io.IOException;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class XmlRecordReader implements RecordReader {

private String startTagS = "";
private String endTagS = "";
private byte[] startTag;
private byte[] endTag;
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;

public XmlRecordReader(JobConf job, FileSplit split) throws IOException {
lineReader = new LineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
startTag = startTagS.getBytes();
endTag = endTagS.getBytes();

// Open the file and seek to the start of the split
start = split.getStart();
end = start + split.getLength();
Path file = split.getPath();
FileSystem fs = file.getFileSystem(job);
fsin = fs.open(split.getPath());
fsin.seek(start);
}

public boolean next(Text key, XmlContent value) throws IOException {
// Get the next line
if (fsin.getPos() < end) {
if (readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
key.set(Long.toString(fsin.getPos()));
value.bufferData = buffer.getData();
value.offsetData = 0;
value.lenghtData = buffer.getLength();
return true;
}
}
finally {
buffer.reset();
}
}
}
return false;
}

private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
int i = 0;
while (true) {
int b = fsin.read(); // End of file -> T
if (b == -1) return false;
// F-> Save to buffer:
if (withinBlock) buffer.write(b);
if (b == match[i]) {
i++;
if (i >= match.length) return true;
} else i = 0;
// see if we’ve passed the stop point:
if(!withinBlock && i == 0 && fsin.getPos() >= end) return false;
}
}

public Text createKey() {
return new Text("");
}

public XmlContent createValue() {
return new XmlContent();
}

public long getPos() throws IOException {
return lineReader.getPos();
}

public void close() throws IOException {
lineReader.close();
}

public float getProgress() throws IOException {
return lineReader.getProgress();
}
}

最后是可写的:

package org.undercloud.mapreduce.example3;

import java.io.*;

import org.apache.hadoop.io.*;

public class XmlContent implements Writable{

public byte[] bufferData;
public int offsetData;
public int lenghtData;


public XmlContent(byte[] bufferData, int offsetData, int lenghtData) {
this.bufferData = bufferData;
this.offsetData = offsetData;
this.lenghtData = lenghtData;
}

public XmlContent(){
this(null,0,0);
}

public void write(DataOutput out) throws IOException {
out.write(bufferData);
out.writeInt(offsetData);
out.writeInt(lenghtData);
}

public void readFields(DataInput in) throws IOException {
in.readFully(bufferData);
offsetData = in.readInt();
lenghtData = in.readInt();
}

public String toString() {
return Integer.toString(offsetData) + ", "
+ Integer.toString(lenghtData) +", "
+ bufferData.toString();
}

}

这看起来是一个非常有用的教程,解决了跨越多个拆分的记录问题。如果您能够根据您的问题调整此示例,请告诉我。

关于java - 在使用 hadoop 从原始日志中捕获异常时,一个完整的异常将被拆分为 2 个映射,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19511034/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com