- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想从 Cloud Pub/Sub 读取数据并使用 Cloud Dataflow 将其写入 BigQuery。每个数据都包含一个表 ID,数据本身将保存在其中。
写入 BigQuery 失败的因素有多种:
WriteResult.getFailedInserts()
为了抢救坏数据,避免卡顿,但效果不佳。有什么好办法吗?
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
public class MyData implements Serializable {
String table_id;
}
public interface MyOptions extends PipelineOptions {
@Description("PubSub topic to read from, specified as projects/<project_id>/topics/<topic_id>")
@Validation.Required
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
}
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<MyData> input = p
.apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
.apply("ParseJSON", MapElements.into(TypeDescriptor.of(MyData.class))
.via((String text) -> new Gson().fromJson(text, MyData.class)));
WriteResult writeResult = input
.apply("WriteToBigQuery", BigQueryIO.<MyData>write()
.to(new SerializableFunction<ValueInSingleWindow<MyData>, TableDestination>() {
@Override
public TableDestination apply(ValueInSingleWindow<MyData> input) {
MyData myData = input.getValue();
return new TableDestination(myData.table_id, null);
}
})
.withSchema(new TableSchema().setFields(new ArrayList<TableFieldSchema>() {{
add(new TableFieldSchema().setName("table_id").setType("STRING"));
}}))
.withFormatFunction(new SerializableFunction<MyData, TableRow>() {
@Override
public TableRow apply(MyData myData) {
return new TableRow().set("table_id", myData.table_id);
}
})
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry()));
writeResult.getFailedInserts()
.apply("LogFailedData", ParDo.of(new DoFn<TableRow, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
TableRow row = c.element();
LOG.info(row.get("table_id").toString());
}
}));
p.run();
}
}
最佳答案
在管道定义中写入输出时,没有简单的方法可以捕获异常。我想你可以通过编写自定义 PTransform
来做到这一点。对于 BigQuery。但是,在 Apache Beam 中无法原生实现。我还建议不要这样做,因为它会破坏 Cloud Dataflow 的自动重试功能。
在您的代码示例中,您将失败的插入重试策略设置为从不重试。您可以将策略设置为始终重试。这仅在间歇性网络故障( 第 4 个要点 )之类的情况下有效。
.withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry())
CREATE_IF_NEEDED
create disposition 配置应允许 Dataflow 作业自动创建新表而不会出错,即使表 ID 不正确。
关于google-bigquery - 如何捕获 BigQueryIO.Write 抛出的任何异常并挽救输出失败的数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48002114/
我阅读了博文 JRuby Performance: Exceptions are not flow control提倡在特殊情况下避免使用异常。 我意识到我犯了定期使用 rescue 处理 LoadE
我在 Controller 中有这个: pickuptime = params[:appointment][:pickuptime] pickuptime = DateTime.strptime(pi
我正在使用 Rails gem 通过 RestClient 向 api 发送请求。我需要挽救 401 错误代码。我在 RestClient 文档中看到了以下内容: > RestClient.get('
根据这篇文章: http://blog.plataformatec.com.br/2012/01/my-five-favorite-hidden-features-in-rails-3-2/ 处理错误
我是一名优秀的程序员,十分优秀!