gpt4 book ai didi

google-bigquery - 如何捕获 BigQueryIO.Write 抛出的任何异常并挽救输出失败的数据?

转载 作者:行者123 更新时间:2023-12-04 18:57:50 28 4
gpt4 key购买 nike

我想从 Cloud Pub/Sub 读取数据并使用 Cloud Dataflow 将其写入 BigQuery。每个数据都包含一个表 ID,数据本身将保存在其中。

写入 BigQuery 失败的因素有多种:

  • 表 ID 格式错误。
  • 数据集不存在。
  • 数据集不允许管道访问。
  • 网络故障。

  • 当其中一个失败发生时,流式作业将重试该任务并停止。我尝试使用 WriteResult.getFailedInserts()为了抢救坏数据,避免卡顿,但效果不佳。有什么好办法吗?

    这是我的代码:

    public class StarterPipeline {
    private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

    public class MyData implements Serializable {
    String table_id;
    }

    public interface MyOptions extends PipelineOptions {
    @Description("PubSub topic to read from, specified as projects/<project_id>/topics/<topic_id>")
    @Validation.Required
    ValueProvider<String> getInputTopic();
    void setInputTopic(ValueProvider<String> value);
    }

    public static void main(String[] args) {
    MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

    Pipeline p = Pipeline.create(options);

    PCollection<MyData> input = p
    .apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
    .apply("ParseJSON", MapElements.into(TypeDescriptor.of(MyData.class))
    .via((String text) -> new Gson().fromJson(text, MyData.class)));
    WriteResult writeResult = input
    .apply("WriteToBigQuery", BigQueryIO.<MyData>write()
    .to(new SerializableFunction<ValueInSingleWindow<MyData>, TableDestination>() {
    @Override
    public TableDestination apply(ValueInSingleWindow<MyData> input) {
    MyData myData = input.getValue();
    return new TableDestination(myData.table_id, null);
    }
    })
    .withSchema(new TableSchema().setFields(new ArrayList<TableFieldSchema>() {{
    add(new TableFieldSchema().setName("table_id").setType("STRING"));
    }}))
    .withFormatFunction(new SerializableFunction<MyData, TableRow>() {
    @Override
    public TableRow apply(MyData myData) {
    return new TableRow().set("table_id", myData.table_id);
    }
    })
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
    .withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry()));
    writeResult.getFailedInserts()
    .apply("LogFailedData", ParDo.of(new DoFn<TableRow, TableRow>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
    TableRow row = c.element();
    LOG.info(row.get("table_id").toString());
    }
    }));

    p.run();
    }
    }

    最佳答案

    在管道定义中写入输出时,没有简单的方法可以捕获异常。我想你可以通过编写自定义 PTransform 来做到这一点。对于 BigQuery。但是,在 Apache Beam 中无法原生实现。我还建议不要这样做,因为它会破坏 Cloud Dataflow 的自动重试功能。

    在您的代码示例中,您将失败的插入重试策略设置为从不重试。您可以将策略设置为始终重试。这仅在间歇性网络故障( 第 4 个要点 )之类的情况下有效。

    .withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry())

    如果表 ID 格式不正确( 第一个要点 ),则 CREATE_IF_NEEDED create disposition 配置应允许 Dataflow 作业自动创建新表而不会出错,即使表 ID 不正确。

    如果数据集不存在或数据集存在访问权限问题( 第二和第三个要点 ),那么我的观点是流作业应该停止并最终失败。没有人工干预,在任何情况下都无法进行。

    关于google-bigquery - 如何捕获 BigQueryIO.Write 抛出的任何异常并挽救输出失败的数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48002114/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com