- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 avro 架构编写 Parquet 文件。但总是遇到这个问题。
Exception in thread "main" java.lang.NoSuchFieldError: DEFAULT_WRITER_VERSION
at org.apache.parquet.hadoop.ParquetWriter.<clinit>(ParquetWriter.java:46)
at com.ice.practice.AvroToParquet.main(AvroToParquet.java:52)
我的示例程序如下:我创建了一个 avro 模式,然后将其转换为 parquet 模式,然后在 parquewriter 的帮助下,我尝试使用 GenericRecords。
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.avro.*;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
public class AvroToParquet {
public static void main(String[] args) throws IOException {
Schema aSchema = new Schema.Parser().parse(new File("d:\\emp.avsc"));
List<GenericData.Record> SourceRecords = new ArrayList<>();
int NoOfRecords = 10;
int NoOfColumns = 3;
for(int i=0;i<NoOfRecords;i++)
{
GenericData.Record recordHolder = new GenericData.Record(aSchema);
recordHolder.put("name", "emp"+i);
recordHolder.put("salary", (10000+(i*1000))+"");
recordHolder.put("dept", "java"+i);
SourceRecords.add(recordHolder);
}
MessageType pSchema = new AvroSchemaConverter().convert(aSchema);
@SuppressWarnings("deprecation")
AvroWriteSupport<GenericRecord> wSupport = new AvroWriteSupport<>(pSchema, aSchema);
CompressionCodecName cCodeName = CompressionCodecName.SNAPPY;
int blockSize = 256 * 1024 * 1024;
int pageSize = 64 * 1024;
Path outputPath = new Path("d:\\emp.parquet");
@SuppressWarnings("deprecation")
ParquetWriter<GenericRecord> pWriter = new ParquetWriter<GenericRecord>(outputPath,wSupport,cCodeName,blockSize,pageSize) {
};
for(GenericRecord record : SourceRecords)
{
pWriter.write(record);
}
pWriter.close();
}
}
avro 架构:
"type":"record",
"name":"employee",
"namespace":"ice.report",
"fields":[
{
"name":"name",
"type":"string"
},
{
"name":"salary",
"type":"string"
},
{
"name":"dept",
"type":"string"
}
]
}
请告诉我如何解决这个问题。
最佳答案
我建议您不要使用过时的构造函数。事实上,它们被弃用是有原因的。相反,尝试 AvroParquetReader
和 AvroParquetWriter
类。详细解释请引用this线。同时,我向您建议以下解决方案:
Java 代码: ParquetAvroHandler.java
package com.parquet.avro;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
public class ParquetAvroHandler
{
private static final Schema SCHEMA;
private static final String SCHEMA_PATH = "path/to/your/schema.avsc";
private static final Path OUTPUT_PATH = new Path("result.parquet");
private static final Logger LOGGER = LoggerFactory.getLogger(ParquetAvroHandler.class);
static
{
try (InputStream inStream = ParquetAvroHandler.class.getResourceAsStream(SCHEMA_PATH))
{
SCHEMA = new Schema.Parser().parse(IOUtils.toString(inStream, "UTF-8"));
}
catch (Exception e)
{
LOGGER.error("Can't read SCHEMA file from {}", SCHEMA_PATH);
LOGGER.error(e.getLocalizedMessage());
throw new RuntimeException("Can't read SCHEMA file from " + SCHEMA_PATH, e);
}
}
/**
* Reads an existing Apache Avro-based Parquet file from the
* specified location and prints it into the system console
*
* @param filePath path to the input file
* @throws IOException
**/
public void read(Path filePath) throws IOException
{
Configuration configuration = new Configuration();
HadoopInputFile inputFile = HadoopInputFile.fromPath(filePath, configuration);
try (ParquetReader<GenericData.Record> reader = AvroParquetReader
.<GenericData.Record>builder(inputFile)
.withConf(configuration)
.build())
{
GenericData.Record record;
while ((record = reader.read()) != null)
{
System.out.println(record);
}
}
}
/**
* Creates a new Apache Avro-based Parquet file or overwrites the existing one
*
* @param records set of records to write to the file
* @param filePath path to the output file
* @throws IOException
**/
public void write(List<GenericData.Record> records, Path filePath) throws IOException
{
try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter
.<GenericData.Record>builder(filePath)
.withSchema(SCHEMA)
.withConf(new Configuration())
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build())
{
for (GenericData.Record record : records)
{
writer.write(record);
}
}
}
public static void main(String[] args) //throws IOException
{
try
{
GenericData.Record record = new GenericData.Record(SCHEMA);
record.put("Name", "John");
record.put("Id", 1);
record.put("PhoneNumber", "555-555-5551");
record.put("ZipCode", 88888);
record.put("isAlive", true);
records.add(record);
record = new GenericData.Record(SCHEMA);
record.put("Name", "Jane");
record.put("Id", 2);
record.put("PhoneNumber", "555-555-5552");
record.put("ZipCode", 99999);
record.put("isAlive", false);
records.add(record);
ParquetAvroHandler handler = new ParquetAvroHandler();
handler.write(records, OUTPUT_PATH);
handler.read(OUTPUT_PATH);
}
catch (Exception e)
{
LOGGER.error(e.getMessage());
e.printStackTrace();
}
}
}
Avro 架构: schema.avsc
{
"namespace": "example.avro",
"type": "record",
"name": "org.apache.avro.file.Header",
"fields":
[
{"name": "Name", "type": "string"},
{"name": "Id", "type": ["int", "null"]},
{"name": "PhoneNumber", "type": ["string", "null"]},
{"name": "ZipCode", "type": ["int", "null"]},
{"name": "isAlive", "type": "boolean"}
]
}
POM 文件: pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet</artifactId>
<version>1.10.0</version>
<name>Sample Name</name>
<description>Sample Description</description>
<dependencies>
<!-- Generic -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<!-- Avro & Hadoop -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
<!-- Parquet -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-encoding</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-format</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.10.0</version>
</dependency>
</dependencies>
</project>
日志配置: log4j.properties
# Root logger option
log4j.rootLogger=INFO, file, console
# Direct log messages to a log file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=\systemlog.log
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
# Direct log messages to console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
关于java - 写入 Parquet 文件时出现问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52038776/
是否可以对 parquet 格式执行分布式并发写入? 是否可以在写入拼花文件时读取它们? 如果有并发读/写的方法,我有兴趣了解。 提前感谢您的帮助。 最佳答案 我最终得到了 Parquet 开发人员的
如何从命令行检查 Parquet 文件的内容? 我现在看到的唯一选择是 $ hadoop fs -get my-path local-file $ parquet-tools head local-f
我正在使用基于 Java(1.8) 的应用程序使用库创建 Parquet 文件 org.apache.avro.Schema 和 org.apache.parquet.hadoop.ParquetWr
我已经使用 pyspark 创建了多个 parquet 文件,现在我正在尝试将所有 parquet 文件合并为 1 个。我能够合并这些文件,但是在读取生成的文件时,我遇到了错误。以前有人遇到过这个问题
我创建了一个数据框,如下所示: expanded_1 = pd.DataFrame({"Point": [random.choice(points) for x in range(30000000)]
当我在 R 和 Python 中保存 Parquet 文件(使用 pyarrow)时,我得到一个保存在元数据中的箭头模式字符串。 如何读取元数据?它是 Flatbuffer 编码数据吗?架构的定义在哪
例如,pandas 的 read_csv有一个 chunk_size允许 read_csv 的参数在 CSV 文件上返回一个迭代器,以便我们可以分块读取它。 Parquet 格式以块的形式存储数据,但
我正在尝试运行最新版本的 Parquet 工具,但遇到了一些问题。出于某种原因org.apache.hadoop.conf.Configuration不在阴影的 jar 里。 (我对 v1.6.0 也
我正在使用 Parquet 框架来编写 Parquet 文件。 我使用此构造函数创建了 Parquet 作家- public class ParquetBaseWriter extends Parqu
使用 spark 和钻头,我可以查询本地 Parquet 文件。 presto 是否提供相同的功能? 换句话说,是否可以使用 presto 查询本地 Parquet 文件 - 无需通过 HDFS 或
我有一个加密的 parquet 数据文件,它被读取为一个输入流。我想从此输入流中提取单个 Parquet 记录。有什么办法可以做到这一点吗?在 avro 中,使用 DatumReader 是可能的。我
我知道 Apache Arrow Parquet 可以读取符合规范的 Delta 编码文件,但不能将它们写出。我想知道是否有任何常用的开源 C++/Python 库可以写出符合 Parquet 规范的
背景: DuckDB 允许直接查询 parquet 文件。例如con.execute("从'Hierarchy.parquet'中选择 *) Parquet 允许按列值对文件进行分区。当一个 Parq
有没有办法将一个巨大的 parquet 文件分成较小的文件(使用 Python)?保留所有列并划分行?谢谢 最佳答案 你可以用 dask 来做. import dask.dataframe as dd
我的 Parquet 文件为 800K 行 x 8.7K 列。我将其加载到 dask 数据框中: import dask.dataframe as dd dask_train_df = dd.read
我有数百个用 PyArrow 创建的 Parquet 文件。然而,其中一些文件的字段/列的名称(我们称其为 Orange)与原始列(称其为 Sporange)略有不同,因为其中一个使用了查询的变体。否
我正在尝试在配置单元中创建 Parquet 表。我可以创建它,但是当我运行 analyze table mytable compute statistics 时;我得到这个结果: numfiles=8
我知道 hdfs 会将文件拆分成大约 64mb 的 block 。我们有流式传输的数据,我们可以将它们存储到大文件或中等大小的文件中。列式文件存储的最佳大小是多少?如果我可以将文件存储到最小列为 64
我想使用 Apache 的 parquet-mr 项目通过 Java 以编程方式读取/写入 Parquet 文件。我似乎找不到任何有关如何使用此 API 的文档(除了查看源代码并查看它的使用方式)——
我在 Impala 中移动数据,而不是我的设计,我丢失了一些数据。我需要将数据从 Parquet 表复制回它们原来的非 Parquet 表。最初,开发人员使用脚本中的一个简单的一行来完成此操作。由于我
我是一名优秀的程序员,十分优秀!