- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我已经下载了 Pub/Sub to BigQuery Dataflow template 的副本来自Google's github repository 。我正在本地计算机上使用 direct-runner 运行它.
在测试中,我确认,如果在 UDF 处理或从 JSON 到 TableRow 的转换期间发生错误,模板只会将失败写入“死信”表。
我还希望更优雅地处理插入 BigQuery 时发生的故障,方法是将它们发送到单独的 TupleTag 中,以便它们也可以发送到死信表或其他输出以进行审查和处理。目前,当使用 dataflow-runner 执行时这些错误只会写入 Stackdriver 日志,并继续无限期地重试,直到问题得到解决。
问题一:在本地测试并发布格式与目标表架构不匹配的消息时,会重试插入 5 次,然后管道崩溃,并出现 RuntimeException 以及从对 Google API 的 HTTP 响应。我相信这种行为是在BigQueryServices.Impl内设置的这里:
private static final FluentBackoff INSERT_BACKOFF_FACTORY =
FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);
但是,基于Google's documentation ,
"When running in streaming mode, a bundle including a failing item will be retried indefinitely, which may cause your pipeline to permanently stall."
作为 Beam 的 Pub/Sub.IO ,
create and consume unbounded PCollections
我的印象是,从 Pub/Sub 读取时应默认启用流模式。我什至在 writeTableRows() 调用中添加了 Streaming_Inserts 方法,但它并没有影响此行为。
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withMethod(Method.STREAMING_INSERTS)
问题二:
我问这个问题是因为我不知道如何在不创建自己的静态类(覆盖扩展方法并使用 ParDo 和 DoFn )的情况下捕获与插入相关的错误,我可以在其中添加自己的自定义逻辑来为成功记录创建单独的 TupleTags和故障记录,类似于 JavascriptTextTransformer 中的操作方式对于 FailsafeJavascriptUdf。
更新:
public static PipelineResult run(DirectOptions options) {
options.setRunner(DirectRunner.class);
Pipeline pipeline = Pipeline.create(options);
// Register the coder for pipeline
FailsafeElementCoder<PubsubMessage, String> coder =
FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);
PCollectionTuple transformOut =
pipeline
//Step #1: Read messages in from Pub/Sub
.apply(
"ReadPubsubMessages",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
//Step #2: Transform the PubsubMessages into TableRows
.apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
WriteResult writeResult = null;
try {
writeResult =
transformOut
.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withMethod(Method.STREAMING_INSERTS)
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to("myproject:MyDataSet.MyTable"));
} catch (Exception e) {
System.out.print("Cause of the Standard Insert Failure is: ");
System.out.print(e.getCause());
}
try {
writeResult
.getFailedInserts()
.apply(
"WriteFailedInsertsToDeadLetter",
BigQueryIO.writeTableRows()
.to(options.getOutputDeadletterTable())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
} catch (Exception e) {
System.out.print("Cause of the Error Insert Failure is: ");
System.out.print(e.getCause());
}
PCollectionList.of(transformOut.get(UDF_DEADLETTER_OUT))
.and(transformOut.get(TRANSFORM_DEADLETTER_OUT))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
WritePubsubMessageErrors.newBuilder()
.setErrorRecordsTable(
maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(getDeadletterTableSchemaJson())
.build());
return pipeline.run();
}
错误:
Cause of the Error Insert Failure is: null[WARNING]
java.lang.NullPointerException: Outputs for non-root node WriteFailedInsertsToDeadLetter are null
at org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:864)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:672)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:575)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at com.google.cloud.teleport.templates.PubSubToBigQuery.run(PubSubToBigQuery.java:312)
at com.google.cloud.teleport.templates.PubSubToBigQuery.main(PubSubToBigQuery.java:186)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
at java.lang.Thread.run(Thread.java:748)
最佳答案
在最新版本的 Beam 中,BigQueryIO.Write转换返回 WriteResult对象,使您能够检索未能输出到 BigQuery 的 TableRows 的 PCollection。使用此功能,您可以轻松检索失败,将其格式化为死信输出的结构,然后将记录重新提交到 BigQuery。这样就不需要单独的类来管理成功和失败的记录。
下面是您的管道的示例。
// Attempt to write the table rows to the output table.
WriteResult writeResult =
pipeline.apply(
"WriteRecordsToBigQuery",
BigQueryIO.writeTableRows()
.to(options.getOutputTable())
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
/*
* 1) Get the failed inserts
* 2) Transform to the deadletter table format.
* 3) Output to the deadletter table.
*/
writeResult
.getFailedInserts()
.apply("FormatFailedInserts", ParDo.of(new FailedInsertFormatter()))
.apply(
"WriteFailedInsertsToDeadletter",
BigQueryIO.writeTableRows()
.to(options.getDeadletterTable())
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
此外,回答您的问题:
流
对于 DirectRunner,将选项设置为 true
。关于google-bigquery - Apache Beam/Google Dataflow PubSub 到 BigQuery 管道 : Handling Insert Errors and Unexpected Retry Behavior,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52044349/
reqwest v0.9 将 serde v1.0 作为依赖项,因此实现 converting serde_json errors into reqwest error . 在我的代码中,我使用 se
我有这个代码: let file = FileStorage { // ... }; file.write("Test", bytes.as_ref()) .map_err(|e| Mu
我只是尝试用angular-cli创建一个新项目,然后运行服务器,但是它停止并显示一条有趣的消息:Error: No errors。 我以这种方式更新了(希望有帮助):npm uninstall -g
我从我的 javascript 发送交易 Metamask 打开传输对话框 我确定 i get an error message in metamask (inpage.js:1 MetaMask -
这个问题在这里已经有了答案: How do you define custom `Error` types in Rust? (3 个答案) How to get a reference to a
我想知道两者之间有什么大的区别 if let error = error{} vs if error != nil?或者只是人们的不同之处,比如他们如何用代码表达自己? 例如,如果我使用这段代码: u
当我尝试发送超过 50KB 的图像时,我在 Blazor 服务器应用程序上收到以下错误消息 Error: Connection disconnected with error 'Error: Serv
我有一个error-page指令,它将所有异常重定向到错误显示页面 我的web.xml: [...] java.lang.Exception /vi
我有这样的对象: address: { "phone" : 888, "value" : 12 } 在 WHERE 中我需要通过 address.value 查找对象,但是在 SQL 中有函数
每次我尝试编译我的代码时,我都会遇到大量错误。这不是我的代码的问题,因为它在另一台计算机上工作得很好。我尝试重新安装和修复,但这没有帮助。这是整个错误消息: 1>------ Build starte
在我的代码的类部分,如果我写一个错误,则在不应该的情况下,将有几行报告为错误。我将'| error'放在可以从错误中恢复的良好/安全位置,但是我认为它没有使用它。也许它试图在某个地方恢复中间表情? 有
我遇到了 csv 输入文件整体读取故障的问题,我可以通过在 read_csv 函数中添加 "error_bad_lines=False" 来删除这些问题来解决这个问题。 但是我需要报告这些造成问题的文
在 Spring 中,验证后我们在 controller 中得到一个 BindingResult 对象。 很简单,如果我收到验证错误,我想重新显示我的表单,并在每个受影响的字段上方显示错误消息。 因此
我不知道出了什么问题,因为我用 Java 编程了大约一年,从来没有遇到过这个错误。在一分钟前在 Eclipse 中编译和运行工作,现在我得到这个错误: #A fatal error has been
SELECT to_char(messages. TIME, 'YYYY/MM/DD') AS FullDate, to_char(messages. TIME, 'MM/DD
我收到这些错误: AnonymousPath\Anonymized.vb : error BC30037: Character is not valid. AnonymousPath\Anonymiz
我刚刚安装了 gridengine 并在执行 qstat 时出现错误: error: commlib error: got select error (Connection refused) erro
嗨,我正在学习 PHP,我从 CRUD 系统开始,我在 Windows 上安装了 WAMP 服务器,当我运行它时,我收到以下错误消息。 SCREAM: Error suppression ignore
我刚刚开始一个新项目,我正在学习核心数据教程,可以找到:https://www.youtube.com/watch?v=zZJpsszfTHM 我似乎无法弄清楚为什么会抛出此错误。我有一个名为“Exp
当我使用 Jenkins 运行新构建时,出现以下错误: "FilePathY\XXX.cpp : fatal error C1853: 'FilePathZ\XXX.pch' precompiled
我是一名优秀的程序员,十分优秀!