gpt4 book ai didi

java - 为什么 SequenceFile 被截断了?

转载 作者:可可西里 更新时间:2023-11-01 14:49:39 26 4
gpt4 key购买 nike

我正在学习Hadoop,这个问题困扰了我一段时间。基本上,我正在将一个 SequenceFile 写入磁盘,然后将其读回。但是,每次读取时都会得到一个EOFException。更深层次的观察发现,在写入序列文件时,它被过早截断,并且总是在写入索引 962 之后发生,并且文件总是固定大小为 45056 字节。

我在 MacBook Pro 上使用 Java 8 和 Hadoop 2.5.1。事实上,我在另一台运行 Java 7 的 Linux 机器上尝试了相同的代码,但同样的事情发生了。

我可以排除 writer/reader 没有正确关闭。我尝试使用带有显式 writer.close() 的旧式 try/catch(如代码所示),并使用较新的 try-with-resource 方法。两者都不起作用。

我们将不胜感激任何帮助。

以下是我使用的代码:

public class SequenceFileDemo {

private static final String[] DATA = { "One, two, buckle my shoe",
"Three, four, shut the door",
"Five, six, pick up sticks",
"Seven, eight, lay them straight",
"Nine, ten, a big fat hen" };

public static void main(String[] args) throws Exception {
String uri = "file:///Users/andy/Downloads/puzzling.seq";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);

Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();

//API change
try {
SequenceFile.Writer writer = SequenceFile.createWriter(conf,
stream(fs.create(path)),
keyClass(IntWritable.class),
valueClass(Text.class));

for ( int i = 0; i < 1024; i++ ) {
key.set( i);
value.clear();
value.set(DATA[i % DATA.length]);

writer.append(key, value);
if ( (i-1) %100 == 0 ) writer.hflush();
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
}

writer.close();

} catch (Exception e ) {
e.printStackTrace();
}


try {
SequenceFile.Reader reader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(path));
Class<?> keyClass = reader.getKeyClass();
Class<?> valueClass = reader.getValueClass();

boolean isWritableSerilization = false;
try {
keyClass.asSubclass(WritableComparable.class);
isWritableSerilization = true;
} catch (ClassCastException e) {

}

if ( isWritableSerilization ) {
WritableComparable<?> rKey = (WritableComparable<?>) ReflectionUtils.newInstance(keyClass, conf);
Writable rValue = (Writable) ReflectionUtils.newInstance(valueClass, conf);
while(reader.next(rKey, rValue)) {
System.out.printf("[%s] %d %s=%s\n",reader.syncSeen(), reader.getPosition(), rKey, rValue);
}
} else {
//make sure io.seraizliatons has the serialization in use when write the sequence file
}

reader.close();
} catch(IOException e) {
e.printStackTrace();
}
}

}

最佳答案

我确实发现了这个错误,这是因为你从来没有关闭在 Writer.stream(fs.create(path)) 中创建的流。

出于某种原因,关闭不会向下传播到您刚刚在那里创建的流。我想这是一个错误,但我现在懒得在 Jira 中查找它。

解决问题的一种方法是简单地使用 Writer.file(path) 代替。

显然,您也可以直接关闭创建流。在下面找到我更正的示例:

    Path path = new Path("file:///tmp/puzzling.seq");

try (FSDataOutputStream stream = fs.create(path)) {
try (SequenceFile.Writer writer = SequenceFile.createWriter(conf, Writer.stream(stream),
Writer.keyClass(IntWritable.class), Writer.valueClass(NullWritable.class))) {

for (int i = 0; i < 1024; i++) {
writer.append(new IntWritable(i), NullWritable.get());
}
}
}

try (SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(path))) {
Class<?> keyClass = reader.getKeyClass();
Class<?> valueClass = reader.getValueClass();

WritableComparable<?> rKey = (WritableComparable<?>) ReflectionUtils.newInstance(keyClass, conf);
Writable rValue = (Writable) ReflectionUtils.newInstance(valueClass, conf);
while (reader.next(rKey, rValue)) {
System.out.printf("%s = %s\n", rKey, rValue);
}

}

关于java - 为什么 SequenceFile 被截断了?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27916872/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com