gpt4 book ai didi

java - 是否可以在用Python编写的Dataflow流管道中导入Java方法 `wrapBigQueryInsertError`?

转载 作者:行者123 更新时间:2023-11-30 05:24:15 26 4
gpt4 key购买 nike

我正在尝试使用 Python3 创建一个 Dataflow 流式传输管道,该管道从 Pub/Sub 主题读取消息,最终“从头开始”将它们写入 BigQuery 表上。我在名为 PubSubToBigQuery.java 的 Dataflow Java 模板中看到过(执行我正在寻找的内容)第三步中的一段代码,用于处理那些转换为表行的 Pub/Sub 消息,当您尝试将它们插入 BigQuery 表时,这些消息会失败。最后,在步骤 4 和 5 的代码片段中,这些代码片段被展平并插入到错误表中:

  • 第 3 步:
PCollection<FailsafeElement<String, String>> failedInserts =
writeResult
.getFailedInsertsWithErr()
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
.setCoder(FAILSAFE_ELEMENT_CODER);
  • 第 4 步和第 5 步
    PCollectionList.of(
ImmutableList.of(
convertedTableRows.get(UDF_DEADLETTER_OUT),
convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
ErrorConverters.WritePubsubMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());


failedInserts.apply(
"WriteFailedRecords",
ErrorConverters.WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
ValueProviderUtils.maybeUseDefaultDeadletterTable(
options.getOutputDeadletterTable(),
options.getOutputTableSpec(),
DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());

为了做到这一点,我怀疑实现这一点的关键在于模板中第一个导入的库:

package com.google.cloud.teleport.templates;
import static com.google.cloud.teleport.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;

这个方法在Python中可用吗?

如果没有,有某种方法可以在 Python 中执行相同的操作,即不检查应插入的记录字段的结构和数据类型是否与 BigQuery 表所期望的相对应?

这种解决方法会严重减慢我的流传输管道的速度。

最佳答案

在 Beam Python 中,执行流式 BigQuery 写入时,转换会返回 BigQuery 写入期间失败的行。请参阅https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1248

所以你可以用与Java模板相同的方式处理这些。

关于java - 是否可以在用Python编写的Dataflow流管道中导入Java方法 `wrapBigQueryInsertError`?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58995447/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com