gpt4 book ai didi

python-3.x - 监控 WriteToBigQuery

转载 作者:行者123 更新时间:2023-12-03 23:20:57 24 4
gpt4 key购买 nike

在我的管道中,我使用 WriteToBigQuery 是这样的:

| beam.io.WriteToBigQuery(
'thijs:thijsset.thijstable',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

这将返回文档中描述的 Dict,如下所示:

The beam.io.WriteToBigQuery PTransform returns a dictionary whose BigQueryWriteFn.FAILED_ROWS entry contains a PCollection of all the rows that failed to be written.



如何打印此 dict 并将其转换为 pcollection 或如何仅打印 FAILED_ROWS?

如果我这样做: | "print" >> beam.Map(print)
然后我得到: AttributeError: 'dict' object has no attribute 'pipeline'
我一定读过一百个管道,但在 WriteToBigQuery 之后我再也没有看到任何东西。

[编辑]
当我完成管道并将结果存储在一个变量中时,我有以下内容:
{'FailedRows': <PCollection[WriteToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRows] at 0x7f0e0cdcfed0>}

但我不知道如何在管道中使用这个结果,如下所示:
| beam.io.WriteToBigQuery(
'thijs:thijsset.thijstable',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
| ['FailedRows'] from previous step
| "print" >> beam.Map(print)

最佳答案

处理无效输入的死信是一种常见的 Beam/Dataflow 用法,适用于 Java 和 Python SDK,但后者的示例并不多。

想象一下,我们有一些虚拟输入数据,其中包含 10 行好行和不符合表模式的坏行:

schema = "index:INTEGER,event:STRING"

data = ['{0},good_line_{1}'.format(i + 1, i + 1) for i in range(10)]
data.append('this is a bad row')

然后,我要做的是命名写入结果(在本例中为 events):

events = (p
| "Create data" >> beam.Create(data)
| "CSV to dict" >> beam.ParDo(CsvToDictFn())
| "Write to BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
"{0}:dataflow_test.good_lines".format(PROJECT),
schema=schema,
)
)

然后访问 FAILED_ROWS 侧面输出:

(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
| "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))

这适用于 DirectRunner并将好行写到 BigQuery:

enter image description here

和坏的本地文件:

$ cat error_log.txt-00000-of-00001 
('PROJECT_ID:dataflow_test.good_lines', {'index': 'this is a bad row'})

如果您使用 DataflowRunner 运行它你需要一些额外的标志。如果你面对 TypeError: 'PDone' object has no attribute '__getitem__'错误你需要添加 --experiments=use_beam_bq_sink使用新的 BigQuery 接收器。

如果您收到 KeyError: 'FailedRows'这是因为新水槽会 default为批处理管道加载 BigQuery 作业:

STREAMING_INSERTS, FILE_LOADS, or DEFAULT. An introduction on loading data to BigQuery: https://cloud.google.com/bigquery/docs/loading-data. DEFAULT will use STREAMING_INSERTS on Streaming pipelines and FILE_LOADS on Batch pipelines.



您可以通过指定 method='STREAMING_INSERTS' 来覆盖该行为。在 WriteToBigQuery :

enter image description here

两者的完整代码 DirectRunnerDataflowRunner here .

关于python-3.x - 监控 WriteToBigQuery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59102519/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com