gpt4 book ai didi

java - Apache Beam 从 GCS 读取 Avro 文件并写入 BigQuery

转载 作者:行者123 更新时间:2023-12-01 16:27:01 26 4
gpt4 key购买 nike

运行 java 作业来读取 Avro 文件,但出现错误。寻求有关此问题的帮助 -

这是代码-

// Get Avro Schema
String schemaJson = getSchema(options.getAvroSchema());
Schema schema = new Schema.Parser().parse(schemaJson);

// Check schema field types before starting the Dataflow job
checkFieldTypes(schema);

// Create the Pipeline object with the options we defined above.
Pipeline pipeline = Pipeline.create(options);
String bqStr = getBQString(options);
// TableSchema ts = BigQueryAvroUtils.getTableSchema(User.SCHEMA$);
// Convert Avro To CSV
PCollection<GenericRecord> records =
pipeline.apply(
"Read Avro files",
AvroIO.readGenericRecords(schema)
.from(options.getInputFile()));

records
.apply(
"Convert Avro to CSV formatted data",
ParDo.of(new ConvertAvroToCsv(schemaJson, options.getCsvDelimiter())))
.apply(
"Write CSV formatted data",
TextIO.write().to(options.getOutput())
.withSuffix(".csv"));

records.apply(
"Write to BigQuery",
BigQueryIO.write()
.to(bqStr)
.withJsonSchema(schemaJson)
.withWriteDisposition(WRITE_APPEND)
.withCreateDisposition(CREATE_IF_NEEDED)
.withFormatFunction(TABLE_ROW_PARSER));
// [END bq_write]

这是我看到的错误 -

2020-06-01 13:14:41 ERROR MonitoringUtil$LoggingHandler:99 - 2020-06-01T07:44:39.240Z: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to org.apache.avro.specific.SpecificRecord
at com.example.AvroToCsv$1.apply(AvroToCsv.java:1)
at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:76)

2020-06-01 13:14:52 ERROR MonitoringUtil$LoggingHandler:99 - 2020-06-01T07:44:48.956Z: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to org.apache.avro.specific.SpecificRecord
at com.example.AvroToCsv$1.apply(AvroToCsv.java:1)
at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:76)

2020-06-01 13:15:03 ERROR MonitoringUtil$LoggingHandler:99 - 2020-06-01T07:44:58.811Z: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to org.apache.avro.specific.SpecificRecord
at com.example.AvroToCsv$1.apply(AvroToCsv.java:1)
at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:76)

2020-06-01 13:15:15 ERROR MonitoringUtil$LoggingHandler:99 - 2020-06-01T07:45:10.673Z: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to org.apache.avro.specific.SpecificRecord
at com.example.AvroToCsv$1.apply(AvroToCsv.java:1)
at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:76)

最佳答案

错误出现在您的 TABLE_ROW_PARSER 函数中。它似乎正在将 Avro GenericRecord 转换为 SpecificRecord。

PrepareWrite 中失败的行是 here 。该行调用您提供的格式函数。格式函数必须将每个输入元素转换为 JSON TableRow。为了提高效率,使用 withAvroFormatFunction 可能会更好。

关于java - Apache Beam 从 GCS 读取 Avro 文件并写入 BigQuery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62128280/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com