gpt4 book ai didi

java - BigQueryIO.Write withJsonSchema 的序列化

转载 作者:搜寻专家 更新时间:2023-11-01 02:58:03 26 4
gpt4 key购买 nike

我的 Beam 管道中有一个 BigQueryIO.Write 阶段,它是通过调用 .withJsonSchema(String) 构建的:

inputStream.apply(
"save-to-bigquery",
BigQueryIO.<Event>write()
.withJsonSchema(jsonSchema)
.to((ValueInSingleWindow<Event> input) ->
new TableDestination(
"table_name$" + PARTITION_SELECTOR.print(input.getValue().getMetadata().getTimestamp()),
null)
)
.withFormatFunction((ConsumerApiRequest event) ->
new TableRow()
.set("id", event.getMetadata().getUuid())
.set("insertId", event.getMetadata().getUuid())
.set("account_id", event.getAccountId())
...
.set("timestamp", ISODateTimeFormat.dateHourMinuteSecondMillis()
.print(event.getMetadata().getTimestamp())))
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
);

我通过 DataflowRunner 运行它,在执行这个阶段时我收到以下错误:

java.lang.IllegalArgumentException: 
com.google.api.client.json.JsonParser.parseValue(JsonParser.java:889)
com.google.api.client.json.JsonParser.parse(JsonParser.java:382)
com.google.api.client.json.JsonParser.parse(JsonParser.java:336)
com.google.api.client.json.JsonParser.parse(JsonParser.java:312)
com.google.api.client.json.JsonFactory.fromString(JsonFactory.java:187)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString(BigQueryHelpers.java:156)
org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$ConstantSchemaDestinations.getSchema(DynamicDestinationsHelpers.java:163)
org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$ConstantSchemaDestinations.getSchema(DynamicDestinationsHelpers.java:150)
org.apache.beam.sdk.io.gcp.bigquery.CreateTables$1.processElement(CreateTables.java:103)
Caused by: java.lang.IllegalArgumentException: expected collection or array type but got class com.google.api.services.bigquery.model.TableSchema
com.google.api.client.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:148)
com.google.api.client.util.Preconditions.checkArgument(Preconditions.java:69)
com.google.api.client.json.JsonParser.parseValue(JsonParser.java:723)
com.google.api.client.json.JsonParser.parse(JsonParser.java:382)
com.google.api.client.json.JsonParser.parse(JsonParser.java:336)
com.google.api.client.json.JsonParser.parse(JsonParser.java:312)
com.google.api.client.json.JsonFactory.fromString(JsonFactory.java:187)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString(BigQueryHelpers.java:156)
org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$ConstantSchemaDestinations.getSchema(DynamicDestinationsHelpers.java:163)
org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$ConstantSchemaDestinations.getSchema(DynamicDestinationsHelpers.java:150)
org.apache.beam.sdk.io.gcp.bigquery.CreateTables$1.processElement(CreateTables.java:103)
org.apache.beam.sdk.io.gcp.bigquery.CreateTables$1$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:233)
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:183)
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:62)
org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1$DoFnInvoker.invokeProcessElement(Unknown Source)
.....

似乎在管道创建/序列化时正确读取了 JSON,但在执行时传递的是反序列化的 JSON 表示,而不是 JSON 字符串。我通过 Guava Resources 类读取资源文件生成 JSON 字符串:

String jsonSchema;
try {
jsonSchema = Resources.toString(Resources.getResource("path_to_json_schema"), Charsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException("Failed to load JSON schema", e);
}

我该如何解决这个序列化问题?

最佳答案

查看引发异常的代码,这似乎是 JSON 解析失败 - 您的 JSON 架构很可能格式不正确。根据the documentation ,它应该看起来像这样:

{
"fields": [
{
"name": string,
"type": string,
"mode": string,
"fields": [
(TableFieldSchema)
],
"description": string
}
]
}

例如:

{
"fields": [
{
"name": "foo",
"type": "INTEGER"
},
{
"name": "bar",
"type": "STRING",
}
]
}

查看失败的 JSON 解析器的代码,我怀疑您缺少外部 {"fields": ...} 并且您的 JSON 字符串仅包含 [...]

关于java - BigQueryIO.Write withJsonSchema 的序列化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46834377/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com