gpt4 book ai didi

java - PIG Custom loader 的 getNext() 被一次又一次地调用

转载 作者:可可西里 更新时间:2023-11-01 16:13:51 25 4
gpt4 key购买 nike

我已经开始为我们的一个项目使用 Apache Pig。我必须创建自定义输入格式来加载我们的数据文件。为此,我遵循了这个例子 Hadoop:Custom Input format .我还创建了自定义 RecordReader 实现来读取数据(我们从其他应用程序获取二进制格式的数据)并将其解析为正确的 JSON 格式。

当我在 Pig 脚本中使用我的自定义加载器时出现问题。一旦调用我的加载器的 getNext() 方法,它就会调用我的自定义 RecordReader 的 nextKeyValue() 方法,该方法工作正常。它正确读取数据,将其传递回我的加载程序,后者解析数据并返回一个元组。到目前为止一切顺利。

当我的加载程序的 getNext() 方法被一次又一次地调用时,问题就出现了。它被调用,工作正常,并返回正确的输出(我调试它直到 return 语句)。但是,我的加载器没有让执行更进一步,而是再次被调用。我试着查看我的加载器被调用的次数,我可以看到这个数字一直到 20K!

有人可以帮我理解我的代码中的问题吗?

加载器

public class SimpleTextLoaderCustomFormat extends LoadFunc {

protected RecordReader in = null;
private byte fieldDel = '\t';
private ArrayList<Object> mProtoTuple = null;
private TupleFactory mTupleFactory = TupleFactory.getInstance();

@Override
public Tuple getNext() throws IOException {
Tuple t = null;
try {
boolean notDone = in.nextKeyValue();
if (!notDone) {
return null;
}
String value = (String) in.getCurrentValue();
byte[] buf = value.getBytes();
int len = value.length();
int start = 0;

for (int i = 0; i < len; i++) {
if (buf[i] == fieldDel) {
readField(buf, start, i);
start = i + 1;
}
}
// pick up the last field
readField(buf, start, len);

t = mTupleFactory.newTupleNoCopy(mProtoTuple);
mProtoTuple = null;

} catch (InterruptedException e) {
int errCode = 6018;
String errMsg = "Error while reading input";
e.printStackTrace();
throw new ExecException(errMsg, errCode,
PigException.REMOTE_ENVIRONMENT, e);
}
return t;
}

private void readField(byte[] buf, int start, int end) {
if (mProtoTuple == null) {
mProtoTuple = new ArrayList<Object>();
}

if (start == end) {
// NULL value
mProtoTuple.add(null);
} else {
mProtoTuple.add(new DataByteArray(buf, start, end));
}

}

@Override
public InputFormat getInputFormat() throws IOException {
//return new TextInputFormat();
return new CustomStringInputFormat();
}

@Override
public void setLocation(String location, Job job) throws IOException {
FileInputFormat.setInputPaths(job, location);
}

@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
in = reader;
}

自定义输入格式

public class CustomStringInputFormat extends FileInputFormat<String, String> {

@Override
public RecordReader<String, String> createRecordReader(InputSplit arg0,
TaskAttemptContext arg1) throws IOException, InterruptedException {
return new CustomStringInputRecordReader();
}

}

自定义 RecordReader

public class CustomStringInputRecordReader extends RecordReader<String, String> {

private String fileName = null;
private String data = null;
private Path file = null;
private Configuration jc = null;
private static int count = 0;

@Override
public void close() throws IOException {
// jc = null;
// file = null;
}

@Override
public String getCurrentKey() throws IOException, InterruptedException {
return fileName;
}

@Override
public String getCurrentValue() throws IOException, InterruptedException {
return data;
}

@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}

@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
FileSplit split = (FileSplit) genericSplit;
file = split.getPath();
jc = context.getConfiguration();
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
InputStream is = FileSystem.get(jc).open(file);
StringWriter writer = new StringWriter();
IOUtils.copy(is, writer, "UTF-8");
data = writer.toString();
fileName = file.getName();
writer.close();
is.close();

System.out.println("Count : " + ++count);

return true;
}

}

最佳答案

在加载器中试试这个

//....

boolean notDone = ((CustomStringInputFormat)in).nextKeyValue();

//...

Text value = new Text(((CustomStringInputFormat))in.getCurrentValue().toString())

关于java - PIG Custom loader 的 getNext() 被一次又一次地调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26113315/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com