gpt4 book ai didi

java - 如何使用 avro 在 parquet 文件模式中创建 REPEATED 类型?

转载 作者:行者123 更新时间:2023-12-02 09:44:09 26 4
gpt4 key购买 nike

我们正在创建一个数据流管道,我们将从 postgres 读取数据并将其写入 parquet 文件。 ParquetIO.Sink 允许您将 GenericRecord 的 PCollection 写入 Parquet 文件(从此处 https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/parquet/ParquetIO.html )。但 parquet 文件架构与我预期的不同

这是我的架构:

schema = new org.apache.avro.Schema.Parser().parse("{\n" +
" \"type\": \"record\",\n" +
" \"namespace\": \"com.example\",\n" +
" \"name\": \"Patterns\",\n" +
" \"fields\": [\n" +
" { \"name\": \"id\", \"type\": \"string\" },\n" +
" { \"name\": \"name\", \"type\": \"string\" },\n" +
" { \"name\": \"createdAt\", \"type\": {\"type\":\"string\",\"logicalType\":\"timestamps-millis\"} },\n" +
" { \"name\": \"updatedAt\", \"type\": {\"type\":\"string\",\"logicalType\":\"timestamps-millis\"} },\n" +
" { \"name\": \"steps\", \"type\": [\"null\",{\"type\":\"array\",\"items\":{\"type\":\"string\",\"name\":\"json\"}}] },\n" +
" ]\n" +
"}");

这是我到目前为止的代码:

Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());

p.apply(JdbcIO.<GenericRecord> read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"org.postgresql.Driver", "jdbc:postgresql://localhost:port/database")
.withUsername("username")
.withPassword("password"))
.withQuery("select * from table limit(10)")
.withCoder(AvroCoder.of(schema))
.withRowMapper((JdbcIO.RowMapper<GenericRecord>) resultSet -> {
GenericRecord record = new GenericData.Record(schema);
ResultSetMetaData metadata = resultSet.getMetaData();
int columnsNumber = metadata.getColumnCount();
for(int i=0; i<columnsNumber; i++) {
Object columnValue = resultSet.getObject(i+1);
if(columnValue instanceof UUID) columnValue=columnValue.toString();
if(columnValue instanceof Timestamp) columnValue=columnValue.toString();
if(columnValue instanceof PgArray) {
Object[] array = (Object[]) ((PgArray) columnValue).getArray();
List list=new ArrayList();
for (Object d : array) {
if(d instanceof PGobject) {
list.add(((PGobject) d).getValue());
}
}
columnValue = list;
}
record.put(i, columnValue);
}
return record;
}))
.apply(FileIO.<GenericRecord>write()
.via(ParquetIO.sink(schema).withCompressionCodec(CompressionCodecName.SNAPPY))
.to("something.parquet")
);

p.run();

这就是我得到的:

message com.example.table {
required binary id (UTF8);
required binary name (UTF8);
required binary createdAt (UTF8);
required binary updatedAt (UTF8);
optional group someArray (LIST) {
repeated binary array (UTF8);
}
}

这就是我所期望的:

message com.example.table {
required binary id (UTF8);
required binary name (UTF8);
required binary createdAt (UTF8);
required binary updatedAt (UTF8);
optional repeated binary someArray(UTF8);
}

请帮忙

最佳答案

我没有找到从 Avro 创建不在 GroupType 中的重复元素的方法。

Beam 中的 ParquetIO 使用 parquet-mr 项目中定义的“标准”avro 转换,该转换已实现 here

似乎有两种方法可以将 Avro ARRAY 字段转换为 Parquet 消息 - 但都不能创建您正在寻找的内容。

目前,avro 转换是与 ParquetIO 交互的唯一方式。我看到这个JIRA Use Beam schema in ParquetIO将其扩展到 Beam Rows,这可能允许不同的 Parquet 消息策略。

或者,您可以为 ParquetIO 创建 JIRA 功能请求以支持 thrift 结构,这应该允许对 parquet 结构进行更精细的控制。

关于java - 如何使用 avro 在 parquet 文件模式中创建 REPEATED 类型?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56800888/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com