gpt4 book ai didi

python - Beam Python Dataflow Runner 在 apply_WriteToBigQuery 中使用已弃用的 BigQuerySink 而不是 WriteToBigQuery

转载 作者:行者123 更新时间:2023-12-01 07:10:47 25 4
gpt4 key购买 nike

就DataflowRunner内部的实现细节而言,很多人可能并不关心使用的是BigQuerySink还是WriteToBigQuery

但是,就我而言,我尝试将代码部署到使用 RunTimeValueProvider 作为参数的数据流模板。 WriteToBigQuery 支持此行为:

class WriteToBigQuery(PTransform):
....

table (str, callable, ValueProvider): The ID of the table, or a callable
that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
numbers ``0-9``, or underscores ``_``. If dataset argument is
:data:`None` then the table argument must contain the entire table
reference specified as: ``'DATASET.TABLE'``
or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one
argument representing an element to be written to BigQuery, and return
a TableReference, or a string table name as specified above.
Multiple destinations are only supported on Batch pipelines at the
moment.

BigQuerySink支持:

class BigQuerySink(dataflow_io.NativeSink):
table (str): The ID of the table. The ID must contain only letters
``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If
**dataset** argument is :data:`None` then the table argument must
contain the entire table reference specified as: ``'DATASET.TABLE'`` or
``'PROJECT:DATASET.TABLE'``.

更有趣的是,代码中的 BigQuerySink 自 2.11.0 起已被弃用。

@deprecated(since='2.11.0', current="WriteToBigQuery")

但是,在 DataFlowRunner 中,当前的代码和注释似乎完全不符合 WriteToBigQuery 是在 BigQuerySink 上使用的默认类的预期:

  def apply_WriteToBigQuery(self, transform, pcoll, options):
# Make sure this is the WriteToBigQuery class that we expected, and that
# users did not specifically request the new BQ sink by passing experiment
# flag.

# TODO(BEAM-6928): Remove this function for release 2.14.0.
experiments = options.view_as(DebugOptions).experiments or []
if (not isinstance(transform, beam.io.WriteToBigQuery)
or 'use_beam_bq_sink' in experiments):
return self.apply_PTransform(transform, pcoll, options)
if transform.schema == beam.io.gcp.bigquery.SCHEMA_AUTODETECT:
raise RuntimeError(
'Schema auto-detection is not supported on the native sink')
standard_options = options.view_as(StandardOptions)
if standard_options.streaming:
if (transform.write_disposition ==
beam.io.BigQueryDisposition.WRITE_TRUNCATE):
raise RuntimeError('Can not use write truncation mode in streaming')
return self.apply_PTransform(transform, pcoll, options)
else:
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
schema = None
if transform.schema:
schema = parse_table_schema_from_json(json.dumps(transform.schema))
return pcoll | 'WriteToBigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
transform.table_reference.tableId,
transform.table_reference.datasetId,
transform.table_reference.projectId,
schema,
transform.create_disposition,
transform.write_disposition,
kms_key=transform.kms_key))

我的问题有两个:

  1. 为什么 DataflowRunnerio.BigQuery 类之间的契约/期望存在差异?
  2. 在不等待错误修复的情况下,是否有人对如何强制 DataflowRunnerBigQuerySink 上使用 WriteToBigQuery 提出建议

最佳答案

WriteToBigQuery 转换有两种不同的写入 BigQuery 的策略:

  • 将插入流式传输到 BigQuery 端点
  • 定期触发文件加载作业(对于批处理管道触发一次)

对于 Python SDK,我们最初只支持流插入,并且我们有一个仅适用于数据流的文件加载运行程序 native 实现(这是 BigQuerySink)。

对于在 Dataflow 上运行的批处理管道,BigQuerySink 被替换为 - 正如您正确找到的那样。对于所有其他情况,都使用流式插入。

在 Beam 的最新版本中,我们在 SDK 中原生添加了对文件加载的支持 - 其实现位于 BigQueryBatchFileLoads 中。 .

因为我们不想破坏用户对旧行为的依赖,所以我们将 BigQueryBatchFileLoads 隐藏在实验标志后面。 (标志是use_beam_bq_sink)。

所以:

  • 在未来的版本中,我们将自动使用 BigQueryBatchFileLoads,但目前,您有两个选项可以访问它:

    1. 直接在管道中使用它(例如 input | BigQueryBatchFileLoads(...))。
    2. 在使用 WriteToBigQuery 时传递选项 --experiments use_beam_bq_sink

希望对您有所帮助!

关于python - Beam Python Dataflow Runner 在 apply_WriteToBigQuery 中使用已弃用的 BigQuerySink 而不是 WriteToBigQuery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58226848/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com