gpt4 book ai didi

java - 为什么 SequenceFile writer 的附加操作会用最后一个值覆盖所有值?

转载 作者:可可西里 更新时间:2023-11-01 16:48:24 26 4
gpt4 key购买 nike

首先,考虑这个 CustomWriter 类:

public final class CustomWriter {

private final SequenceFile.Writer writer;

CustomWriter(Configuration configuration, Path outputPath) throws IOException {
FileSystem fileSystem = FileSystem.get(configuration);
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath, true);
}

writer = SequenceFile.createWriter(configuration,
SequenceFile.Writer.file(outputPath),
SequenceFile.Writer.keyClass(LongWritable.class),
SequenceFile.Writer.valueClass(ItemWritable.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DefaultCodec()),
SequenceFile.Writer.blockSize(1024 * 1024),
SequenceFile.Writer.bufferSize(fileSystem.getConf().getInt("io.file.buffer.size", 4 * 1024)),
SequenceFile.Writer.replication(fileSystem.getDefaultReplication(outputPath)),
SequenceFile.Writer.metadata(new SequenceFile.Metadata()));
}

public void close() throws IOException {
writer.close();
}

public void write(Item item) throws IOException {
writer.append(new LongWritable(item.getId()), new ItemWritable(item));
}
}

我想做的是消费 Item 类型对象的异步流。消费者有一个对 CustomWriter 实例的引用。然后,它会为收到的每个项目调用 CustomWriter#write 方法。当流结束时,调用 CustomWriter#close 方法关闭编写器。

如您所见,我只创建了一个 writer,它开始附加到一个全新的文件。所以,毫无疑问 this不是原因。

我还应该注意,我目前正在使用 MiniDFSCluster 按照说明在单元测试环境中运行它 here .如果我在非单元测试环境中运行它(即没有 MiniDFSCluster),它似乎工作得很好。

当我尝试读回文件时,我看到的只是最后写入的 Item 对象 N 次(其中 N 是流中接收到的项目总数)。这是一个例子:

sparkContext.hadoopFile(path, SequenceFileInputFormat.class, LongWritable.class, ItemWritable.class)
.collect()
.forEach(new BiConsumer<>() {
@Override
public void accept(Tuple2<LongWritable, ItemWritable> tuple) {
LongWritable id = tuple._1();
ItemWritable item = tuple._2();
System.out.print(id.get() + " -> " + item.get());
}
});

这将打印如下内容:

...
1234 -> Item[...]
1234 -> Item[...]
1234 -> Item[...]
...

是我做错了什么,还是使用 MiniDFSCluster 的副作用?

最佳答案

Writable(如LongWritable, ItemWritable)在处理数据时被重用。当接收到一条记录时,Writable 通常只是替换它的内容,您将只接收到相同的Writable 对象。如果你想将它们收集到一个数组中,你应该将它们复制到一个新对象中。

关于java - 为什么 SequenceFile writer 的附加操作会用最后一个值覆盖所有值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34684845/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com