gpt4 book ai didi

java - 从 ByteArrayOutputStream 而不是文件中读取拼花数据

转载 作者:行者123 更新时间:2023-12-01 14:21:14 28 4
gpt4 key购买 nike

我想转换此代码:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.MessageType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


public class ParquetReaderUtils {

public static Parquet getParquetData(String filePath) throws IOException {
List<SimpleGroup> simpleGroups = new ArrayList<>();
ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(filePath), new Configuration()));
MessageType schema = reader.getFooter().getFileMetaData().getSchema();
//List<Type> fields = schema.getFields();
PageReadStore pages;
while ((pages = reader.readNextRowGroup()) != null) {
long rows = pages.getRowCount();
MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));

for (int i = 0; i < rows; i++) {
SimpleGroup simpleGroup = (SimpleGroup) recordReader.read();
simpleGroups.add(simpleGroup);
}
}
reader.close();
return new Parquet(simpleGroups, schema);
}
}

(来自 https://www.arm64.ca/post/reading-parquet-files-java/ )

采用 ByteArrayOutputStream 参数而不是 filePath。

这可能吗?我在 org.apache.parquet.hadoop 中没有看到 ParquetStreamReader。

任何帮助表示赞赏。我正在尝试为来自 kafka 的 parquet 编写一个测试应用程序,并且将许多消息中的每一个写入文件都相当慢。

最佳答案

因此,如果没有更深入的测试,我会尝试使用此类(尽管输出流的内容应该与 Parquet 兼容)。我在那里放了一个 streamId 以使处理过的字节数组的识别更容易(如果出现问题,ParquetFileReader 会打印出 instance.toString() )。

public class ParquetStream implements InputFile {
private final String streamId;
private final byte[] data;

private static class SeekableByteArrayInputStream extends ByteArrayInputStream {
public SeekableByteArrayInputStream(byte[] buf) {
super(buf);
}

public void setPos(int pos) {
this.pos = pos;
}

public int getPos() {
return this.pos;
}
}

public ParquetStream(String streamId, ByteArrayOutputStream stream) {
this.streamId = streamId;
this.data = stream.toByteArray();
}

@Override
public long getLength() throws IOException {
return this.data.length;
}

@Override
public SeekableInputStream newStream() throws IOException {
return new DelegatingSeekableInputStream(new SeekableByteArrayInputStream(this.data)) {
@Override
public void seek(long newPos) throws IOException {
((SeekableByteArrayInputStream) this.getStream()).setPos((int) newPos);
}

@Override
public long getPos() throws IOException {
return ((SeekableByteArrayInputStream) this.getStream()).getPos();
}
};
}

@Override
public String toString() {
return "ParquetStream[" + streamId + "]";
}
}

关于java - 从 ByteArrayOutputStream 而不是文件中读取拼花数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58141248/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com