gpt4 book ai didi

java - 如何高效读写Parquet文件?

转载 作者:可可西里 更新时间:2023-11-01 16:23:07 29 4
gpt4 key购买 nike

我正在开发一种实用程序,它一次读取多个 parquet 文件并将它们写入一个输出文件。实现非常简单。该实用程序从目录中读取 parquet 文件,从所有文件中读取 Group 并将它们放入列表中。然后使用 ParquetWrite 将所有这些组写入一个文件中。
读取 600mb 后,它抛出 Java 堆空间内存不足错误。读写500mb的数据也需要15-20分钟。

Is there a way to make this operation more efficient?

读取方法如下所示:

ParquetFileReader reader = new ParquetFileReader(conf, path, ParquetMetadataConverter.NO_FILTER);
ParquetMetadata readFooter = reader.getFooter();
MessageType schema = readFooter.getFileMetaData().getSchema();
ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);
reader.close();
PageReadStore pages = null;
try {
while (null != (pages = r.readNextRowGroup())) {
long rows = pages.getRowCount();
System.out.println("Number of rows: " + pages.getRowCount());

MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
for (int i = 0; i < rows; i++) {
Group g = (Group) recordReader.read();
//printGroup(g);
groups.add(g);
}
}
} finally {
System.out.println("close the reader");

r.close();
}

写法是这样的:

for(Path file : files){
groups.addAll(readData(file));
}

System.out.println("Number of groups from the parquet files "+groups.size());

Configuration configuration = new Configuration();
Map<String, String> meta = new HashMap<String, String>();
meta.put("startkey", "1");
meta.put("endkey", "2");
GroupWriteSupport.setSchema(schema, configuration);
ParquetWriter<Group> writer = new ParquetWriter<Group>(
new Path(outputFile),
new GroupWriteSupport(),
CompressionCodecName.SNAPPY,
2147483647,
268435456,
134217728,
true,
false,
ParquetProperties.WriterVersion.PARQUET_2_0,
configuration);
System.out.println("Number of groups to write:"+groups.size());
for(Group g : groups) {
writer.write(g);
}
writer.close();

最佳答案

我使用这些函数来合并 parquet 文件,但它是在 Scala 中。无论如何,它可能会给你一个很好的起点。

import java.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter}
import org.apache.parquet.hadoop.util.{HadoopInputFile, HadoopOutputFile}
import org.apache.parquet.schema.MessageType

import scala.collection.JavaConverters._

object ParquetFileMerger {
def mergeFiles(inputFiles: Seq[Path], outputFile: Path): Unit = {
val conf = new Configuration()
val mergedMeta = ParquetFileWriter.mergeMetadataFiles(inputFiles.asJava, conf).getFileMetaData
val writer = new ParquetFileWriter(conf, mergedMeta.getSchema, outputFile, ParquetFileWriter.Mode.OVERWRITE)

writer.start()
inputFiles.foreach(input => writer.appendFile(HadoopInputFile.fromPath(input, conf)))
writer.end(mergedMeta.getKeyValueMetaData)
}

def mergeBlocks(inputFiles: Seq[Path], outputFile: Path): Unit = {
val conf = new Configuration()
val parquetFileReaders = inputFiles.map(getParquetFileReader)
val mergedSchema: MessageType =
parquetFileReaders.
map(_.getFooter.getFileMetaData.getSchema).
reduce((a, b) => a.union(b))

val writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outputFile, conf), mergedSchema, ParquetFileWriter.Mode.OVERWRITE, 64*1024*1024, 8388608)

writer.start()
parquetFileReaders.foreach(_.appendTo(writer))
writer.end(new util.HashMap[String, String]())
}

def getParquetFileReader(file: Path): ParquetFileReader = {
ParquetFileReader.open(HadoopInputFile.fromPath(file, new Configuration()))
}
}

关于java - 如何高效读写Parquet文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51328393/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com