gpt4 book ai didi

google-cloud-platform - GCP Dataflow Apache Beam 写入输出错误处理

转载 作者:行者123 更新时间:2023-12-03 09:00:18 25 4
gpt4 key购买 nike

我需要对数据流应用错误处理,以使用相同的主键多次插入 Spanner。逻辑是,在当前消息之后可能会收到较旧的消息,并且我不想覆盖保存的值。因此,我将创建我的突变作为插入,并在尝试重复插入时抛出错误。

我在 DoFn 中看到了几个 try block 的示例,它们写入侧面输出以记录任何错误。这是一个非常好的解决方案,但我需要对写入不包含 DoFn 的 Spanner 的步骤应用错误处理

spannerBranchTuples2.get(spannerOutput2)
.apply("Create Spanner Mutation", ParDo.of(createSpannerMutation))
.apply("Write Spanner Records", SpannerIO.write()
.withInstanceId(options.getSpannerInstanceId())
.withDatabaseId(options.getSpannerDatabaseId())
.grouped());

我还没有找到任何允许将错误处理应用于此步骤的文档,或者找到一种将其重写为 DoFn 的方法。有什么建议如何对此应用错误处理吗?谢谢

最佳答案

有一个有趣的 pattern for this in Dataflow documentation .

基本上,这个想法是在将结果发送到写入转换之前有一个 DoFn。它看起来像这样:

    final TupleTag<Output> successTag = new TupleTag<>() {};
final TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* … */;
PCollectionTuple outputTuple = input.apply(ParDo.of(new DoFn<Input, Output>() {
@Override
void processElement(ProcessContext c) {
try {
c.output(process(c.element());
} catch (Exception e) {
LOG.severe("Failed to process input {} -- adding to dead letter file",
c.element(), e);
c.sideOutput(deadLetterTag, c.element());
}
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

outputTuple.get(deadLetterTag)
.apply(/* Write to a file or table or anything */);

outputTuple.get(successTag)
.apply(/* Write to Spanner or any other sink */);

请告诉我这是否有用!

关于google-cloud-platform - GCP Dataflow Apache Beam 写入输出错误处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51134456/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com