- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在尝试使用 Dataflow 创建一个流式传输管道,该管道从 PubSub 主题读取消息,最终将它们写入 BigQuery 表。我不想使用任何数据流模板。
目前我只想在从 Google VM 实例执行的 Python3 脚本中创建一个管道,以执行从 Pubsub 到达的每条消息的加载和转换过程(解析它包含的记录并添加一个新字段) 最终将结果写入 BigQuery 表。
简化一下,我的代码是:
#!/usr/bin/env python
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1,
import apache_beam as beam
import apache_beam.io.gcp.bigquery
import logging
import argparse
import sys
import json
from datetime import datetime, timedelta
def load_pubsub(message):
try:
data = json.loads(message)
records = data["messages"]
return records
except:
raise ImportError("Something went wrong reading data from the Pub/Sub topic")
class ParseTransformPubSub(beam.DoFn):
def __init__(self):
self.water_mark = (datetime.now() + timedelta(hours = 1)).strftime("%Y-%m-%d %H:%M:%S.%f")
def process(self, records):
for record in records:
record["E"] = self.water_mark
yield record
def main():
table_schema = apache_beam.io.gcp.bigquery.parse_table_schema_from_json(open("TableSchema.json"))
parser = argparse.ArgumentParser()
parser.add_argument('--input_topic')
parser.add_argument('--output_table')
known_args, pipeline_args = parser.parse_known_args(sys.argv)
with beam.Pipeline(argv = pipeline_args) as p:
pipe = ( p | 'ReadDataFromPubSub' >> beam.io.ReadStringsFromPubSub(known_args.input_topic)
| 'LoadJSON' >> beam.Map(load_pubsub)
| 'ParseTransform' >> beam.ParDo(ParseTransformPubSub())
| 'WriteToAvailabilityTable' >> beam.io.WriteToBigQuery(
table = known_args.output_table,
schema = table_schema,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
(例如)在 PubSub 主题中发布的消息使用如下:
'{"messages":[{"A":"Alpha", "B":"V1", "C":3, "D":12},{"A":"Alpha", "B":"V1", "C":5, "D":14},{"A":"Alpha", "B":"V1", "C":3, "D":22}]}'
如果在记录中添加字段“E”,则记录的结构(Python 中的字典)和字段的数据类型是 BigQuery 表所期望的。
我要处理的问题是:
如果某些消息带有意外的结构,我想 fork 管道展平并将它们写入另一个 BigQuery 表。
如果某些消息带有意外的字段数据类型,那么在管道的最后一级将它们写入表中时会发生错误。我想通过将记录转移到第三个表来管理此类错误。
我阅读了以下页面上的文档,但一无所获: https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline https://cloud.google.com/dataflow/docs/guides/common-errors
顺便说一句,如果我选择通过从 PubSubSubscription 读取并写入 BigQuery 的模板来配置管道的选项,我会得到以下架构,结果与我正在寻找的架构相同:
最佳答案
您无法捕捉到 BigQuery 的接收器中发生的错误。你写入 bigquery 的消息一定是好的。
最好的模式是执行一个检查消息结构和字段类型的转换。如果出现错误,您创建一个错误流程并将此问题流程写入一个文件(例如,或者在一个没有模式的表中,您以纯文本形式写入您的消息)
关于python - 如何使用 Python 处理 Dataflow 管道中的 BigQuery 插入错误?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58863561/
我有一个包含数据的表,其中在 A 列中我有一组重复的数据(一个接一个)。 我只想根据 A 列中的值(没有其他条件)选择每个组的第一行。请注意,我还希望为提到的新发现的行选择所有相应的列(我不想排除它们
我有一个包含 340GB 数据的表,但我们只使用了最后一周的数据。因此,为了最小化将这些数据移动到分区表或分片表的成本计划。 我对分片表和分区做了一些实验。我创建了分区表并加载了两天的数据(两个分区)
我想安排将数据从 GCS 存储桶加载到 BigQuery 表。如果我使用 bigquery-transfer 与调度及时的 bigquery-loads 的核心区别是什么? 最佳答案 它们是相同的。
我想安排将数据从 GCS 存储桶加载到 BigQuery 表。如果我使用 bigquery-transfer 与调度及时的 bigquery-loads 的核心区别是什么? 最佳答案 它们是相同的。
我想编写一个 BigQuery 命令行命令来检索 BigQuery 表的最后修改时间。我怎样才能做到这一点? 仅当 BigQuery 表的最后修改日期时间大于某个日期时间时,我才会使用它。 最佳答案
我似乎无法将任何数据从 Socrata 上传到 BigQuery。我收到“加载操作中的 BigQuery 错误:无法连接 BigQuery 服务器。”最初我得到的是 0 错误错误的限制。现在我已将 C
我正在尝试弄清楚是否可以从大查询中导出 hyperloglog 草图并在外部合并它们以进行基数估计。是否有可用的开源库可以轻松解析大型查询草图? 如果不是,是否有任何关于 biq 查询的 hyperl
这是我用作https://cloud.google.com/bigquery/docs/managing-tables#bigquery-copy-table-python的引用的代码: source
构建管道时,源是 BigQueryIO.Read,您会得到一组 TableRow 对象以供使用。 我基本上想对那些 TableRow 对象进行一些小的更改,然后使用 BigQueryIO.Write
BigQuery API Client Libraries 之间有什么区别?和 BigQuery Storage API Client Libraries ? 在 BigQuery Storage R
据我所知,将数据流式传输到 BigQuery 会导致重复行,正如这里提到的 https://cloud.google.com/bigquery/streaming-data-into-bigquery
我在 BigQuery Jobs API 中注意到复制任务: https://developers.google.com/bigquery/docs/reference/v2/jobs#resourc
https://cloud.google.com/bigquery/docs/reference/datatransfer/rest/ 我正在寻找“bigquery 数据传输服务”的 php 客户端库
我正在从 GCS 中的 CSV 文件到 BQ 执行一些 ETL,一切正常,除了日期。我的表中的字段名称是 TEST_TIME,类型是 DATE,所以在 TableRow 中我尝试传递一个 java.u
我已经阅读了 BigQuery 连接器的文档(https://support.google.com/360suite/datastudio/answer/6370296?hl=en)。 我想将自定义查
当两个不同的billing account下有两个project,并且有跨两个project的授权view时,view的查询费用由哪个billing account来计费? 场景:项目 A 包含使用项
所以我有一张购买表: 用户编号 购买时间 数量 我有一张网站上的用户事件表: 用户编号 位置 浏览时间 如何在不超过 purchase_time 的情况下将 purchases 表与 activiti
我有一个 unix 时间戳列,在我的 csv 文件中以毫秒表示。现在,当我将这些数据插入到我的 bigQuery 表中并查询它时,我得到了这个错误 bigQuery not supporting mi
我目前正在将 BigQuery 表提取到 Google Cloud Storage 中的分片 .csv 中——是否有任何方法可以对提取的行进行洗牌/随机化? GCS .csv 将用作 GCMLE 模型
我需要从数据流更新和删除 BigQuery 中的记录。数据来自 Pubsub,并带有标识操作插入、更新、删除 (I、U、D) 的标志。插入不是问题。 有更新和删除的建议吗? 最佳答案 Dataflow
我是一名优秀的程序员,十分优秀!