- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我的 Beam 管道中有一个 BigQueryIO.Write 阶段,它是通过调用 .withJsonSchema(String)
构建的:
inputStream.apply(
"save-to-bigquery",
BigQueryIO.<Event>write()
.withJsonSchema(jsonSchema)
.to((ValueInSingleWindow<Event> input) ->
new TableDestination(
"table_name$" + PARTITION_SELECTOR.print(input.getValue().getMetadata().getTimestamp()),
null)
)
.withFormatFunction((ConsumerApiRequest event) ->
new TableRow()
.set("id", event.getMetadata().getUuid())
.set("insertId", event.getMetadata().getUuid())
.set("account_id", event.getAccountId())
...
.set("timestamp", ISODateTimeFormat.dateHourMinuteSecondMillis()
.print(event.getMetadata().getTimestamp())))
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
);
我通过 DataflowRunner
运行它,在执行这个阶段时我收到以下错误:
java.lang.IllegalArgumentException:
com.google.api.client.json.JsonParser.parseValue(JsonParser.java:889)
com.google.api.client.json.JsonParser.parse(JsonParser.java:382)
com.google.api.client.json.JsonParser.parse(JsonParser.java:336)
com.google.api.client.json.JsonParser.parse(JsonParser.java:312)
com.google.api.client.json.JsonFactory.fromString(JsonFactory.java:187)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString(BigQueryHelpers.java:156)
org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$ConstantSchemaDestinations.getSchema(DynamicDestinationsHelpers.java:163)
org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$ConstantSchemaDestinations.getSchema(DynamicDestinationsHelpers.java:150)
org.apache.beam.sdk.io.gcp.bigquery.CreateTables$1.processElement(CreateTables.java:103)
Caused by: java.lang.IllegalArgumentException: expected collection or array type but got class com.google.api.services.bigquery.model.TableSchema
com.google.api.client.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:148)
com.google.api.client.util.Preconditions.checkArgument(Preconditions.java:69)
com.google.api.client.json.JsonParser.parseValue(JsonParser.java:723)
com.google.api.client.json.JsonParser.parse(JsonParser.java:382)
com.google.api.client.json.JsonParser.parse(JsonParser.java:336)
com.google.api.client.json.JsonParser.parse(JsonParser.java:312)
com.google.api.client.json.JsonFactory.fromString(JsonFactory.java:187)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString(BigQueryHelpers.java:156)
org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$ConstantSchemaDestinations.getSchema(DynamicDestinationsHelpers.java:163)
org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$ConstantSchemaDestinations.getSchema(DynamicDestinationsHelpers.java:150)
org.apache.beam.sdk.io.gcp.bigquery.CreateTables$1.processElement(CreateTables.java:103)
org.apache.beam.sdk.io.gcp.bigquery.CreateTables$1$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:233)
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:183)
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:62)
org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1$DoFnInvoker.invokeProcessElement(Unknown Source)
.....
似乎在管道创建/序列化时正确读取了 JSON,但在执行时传递的是反序列化的 JSON 表示,而不是 JSON 字符串。我通过 Guava Resources
类读取资源文件生成 JSON 字符串:
String jsonSchema;
try {
jsonSchema = Resources.toString(Resources.getResource("path_to_json_schema"), Charsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException("Failed to load JSON schema", e);
}
我该如何解决这个序列化问题?
最佳答案
查看引发异常的代码,这似乎是 JSON 解析失败 - 您的 JSON 架构很可能格式不正确。根据the documentation ,它应该看起来像这样:
{
"fields": [
{
"name": string,
"type": string,
"mode": string,
"fields": [
(TableFieldSchema)
],
"description": string
}
]
}
例如:
{
"fields": [
{
"name": "foo",
"type": "INTEGER"
},
{
"name": "bar",
"type": "STRING",
}
]
}
查看失败的 JSON 解析器的代码,我怀疑您缺少外部 {"fields": ...}
并且您的 JSON 字符串仅包含 [...]
。
关于java - BigQueryIO.Write withJsonSchema 的序列化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46834377/
我正在执行 UPDATE .WRITE() 语句,并发现它显然只有在您像这样定义它时才有效: string sql = "UPDATE [dbo].[Table] SET [Column].WRITE
我在 Unix 系统上用 C 编程。我知道: write(fd,"ABCD",4); 比这样做更好: write(fd, "A", 1); write(fd, "B", 1); write(fd, "
func hash(s string) uint32 { h := fnv.New32a() h.Write([]byte(s)) return h.Sum32() } 对于这
在经典的 asp 页面中,有人告诉我您可以使用 vbscript 或 jscript。而 jscript 就是 javascript。 所以我不确定 Response.Write、Response.W
当 openssl 子进程尝试 write() 到本地目录时,我收到此错误。在调用 write() 之前连接已关闭。它没有与 ssl 连接,因为我什至无法从 nodejs 文档启动示例代码。 我错过了
最近我在试验netty。我遇到了以下问题: ctx.channel().write(new TextWebSocketFrame("hello")) 没有在客户端返回 hello,但是 ctx.cha
请解释以下内容: def feed(data): import os print "DATA LEN: %s" % len(data) f = open("copy", "w") f.
有什么区别debug.write 和 Trace.write ?每个应该什么时候使用? 最佳答案 在典型的发布构建配置中,Debug class 被禁用并且什么都不做。 Trace但是,仍然可以在发行
我只是想知道,就性能而言,哪个更好(我在 FileStream 中使用 StreamWriter): 多次调用 Stream.Write(): StreamWriter sw = new Stream
我发现自己写给 stringwriter,然后在函数末尾执行 resp.Write(sw.ToString())。这是不必要的吗?如果我多次使用 HttpResponse.Write,即使我的页面是
我正在尝试通过 JavaScript 文件从 electron 打开一个新窗口,它可以工作,并打开了新窗口,但我无法将 HTML/文本写入新文件。我收到那个错误: Cannot read proper
我们对 QIODevice::write 的一般行为和具体的 QTcpSocket 实现感到非常困惑。有一个 similar question已经,但答案并不令人满意。主要的混淆源于分别提到的 byt
我知道这听起来像是一个愚蠢的问题: write(*,*) 和 write(6,*) ?我在我研究所的 super 计算机上运行一个复杂的代码,它通过一个不同于 6 的单元号输出一个数据文件,显然编译的
我有一个结构体,它可以通过一系列复杂的方法调用转换为文本,其中包含大量 write!调用。此文本可以写入文件或调试日志。我正在决定是否使用 fmt::Write 或 io::Write .我不能真正使
已关闭。这个问题是 not reproducible or was caused by typos 。目前不接受答案。 这个问题是由拼写错误或无法再重现的问题引起的。虽然类似的问题可能是 on-top
In the C standard library, an output can't be followed by an input and vice versa. 对于Linux API,可以在re
我希望能够为一件事做 document.write。然后延迟半秒,然后再记录。写一些。你知道这是否可能吗?而且,如果是这样,怎么办?到目前为止,我已经尝试过了,但没有奏效: document.writ
为什么通过 onclick 属性调用的 write() 函数解析为 document.write() 并替换文档?有什么办法可以阻止这种情况发生吗? Write Function Alternat
我想创建一个包含多个“页面”的文本文件,并将每个页面的字节偏移量记录在一个单独的文件中。为此,我将字符串打印到主输出文件并使用 bytes_written += file.write(str) 计算字
关闭。这个问题需要更多focused .它目前不接受答案。 想改进这个问题吗? 更新问题,使其只关注一个问题 editing this post . 关闭 8 年前。 Improve this qu
我是一名优秀的程序员,十分优秀!