- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
在我的管道中,我使用 WriteToBigQuery 是这样的:
| beam.io.WriteToBigQuery(
'thijs:thijsset.thijstable',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
The beam.io.WriteToBigQuery PTransform returns a dictionary whose BigQueryWriteFn.FAILED_ROWS entry contains a PCollection of all the rows that failed to be written.
| "print" >> beam.Map(print)
AttributeError: 'dict' object has no attribute 'pipeline'
{'FailedRows': <PCollection[WriteToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRows] at 0x7f0e0cdcfed0>}
| beam.io.WriteToBigQuery(
'thijs:thijsset.thijstable',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
| ['FailedRows'] from previous step
| "print" >> beam.Map(print)
最佳答案
处理无效输入的死信是一种常见的 Beam/Dataflow 用法,适用于 Java 和 Python SDK,但后者的示例并不多。
想象一下,我们有一些虚拟输入数据,其中包含 10 行好行和不符合表模式的坏行:
schema = "index:INTEGER,event:STRING"
data = ['{0},good_line_{1}'.format(i + 1, i + 1) for i in range(10)]
data.append('this is a bad row')
events
):
events = (p
| "Create data" >> beam.Create(data)
| "CSV to dict" >> beam.ParDo(CsvToDictFn())
| "Write to BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
"{0}:dataflow_test.good_lines".format(PROJECT),
schema=schema,
)
)
FAILED_ROWS
侧面输出:
(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
| "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))
DirectRunner
并将好行写到 BigQuery:
$ cat error_log.txt-00000-of-00001
('PROJECT_ID:dataflow_test.good_lines', {'index': 'this is a bad row'})
DataflowRunner
运行它你需要一些额外的标志。如果你面对
TypeError: 'PDone' object has no attribute '__getitem__'
错误你需要添加
--experiments=use_beam_bq_sink
使用新的 BigQuery 接收器。
KeyError: 'FailedRows'
这是因为新水槽会
default为批处理管道加载 BigQuery 作业:
STREAMING_INSERTS, FILE_LOADS, or DEFAULT. An introduction on loading data to BigQuery: https://cloud.google.com/bigquery/docs/loading-data. DEFAULT will use STREAMING_INSERTS on Streaming pipelines and FILE_LOADS on Batch pipelines.
method='STREAMING_INSERTS'
来覆盖该行为。在
WriteToBigQuery
:
DirectRunner
和
DataflowRunner
here .
关于python-3.x - 监控 WriteToBigQuery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59102519/
我目前有一个 python 数据流作业,其最终接收器是 PCollection 写入 BigQuery。它失败并出现以下错误: Workflow failed. Causes: S01:XXXX+XX
在我的管道中,我使用 WriteToBigQuery 是这样的: | beam.io.WriteToBigQuery( 'thijs:thijsset.thijstable',
我有一个流式管道,它从发布/订阅中获取消息,解析它们,然后写入 BigQuery。挑战在于,每条消息都会根据消息中的 event 属性转到不同的事件表,并且它们没有排序。 这意味着(我相信)Write
就DataflowRunner内部的实现细节而言,很多人可能并不关心使用的是BigQuerySink还是WriteToBigQuery。 但是,就我而言,我尝试将代码部署到使用 RunTimeValu
我有一个 Google App Engine 触发 Cloud DataFlow 管道。该管道应该将最终的 PCollection 写入 Google BigQuery,但我找不到安装正确的 apac
我是一名优秀的程序员,十分优秀!