gpt4 book ai didi

python - 从 Pub/Sub 流式传输到 BigQuery

转载 作者:太空狗 更新时间:2023-10-30 01:26:26 24 4
gpt4 key购买 nike

我正在尝试使用 python 数据流将一些数据从 google PubSub 流式传输到 BigQuery。出于测试目的,我修改了以下代码 https://github.com/GoogleCloudPlatform/DataflowSDK-examples/blob/master/python/dataflow_examples/cookbook/bigquery_schema.py通过设置

options.view_as(StandardOptions).streaming = True

然后我将 record_ids 管道更改为从 Pub/Sub 读取

# ADDED THIS
lines = p | 'Read PubSub' >> beam.io.ReadStringsFromPubSub(INPUT_TOPIC) | beam.WindowInto(window.FixedWindows(15))
# CHANGED THIS # record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5'])
record_ids = lines | 'Split' >> (beam.FlatMap(split_fn).with_output_types(unicode))
records = record_ids | 'CreateRecords' >> beam.Map(create_random_record)
records | 'Write' >> beam.io.Write(
beam.io.BigQuerySink(
OUTPUT,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))

注意:我已被谷歌列入白名单以运行代码(在 alpha 中)

现在当我尝试时出现错误

工作流程失败。原因:(f215df7c8fcdbb00):未知的流接收器:bigquery

您可以在这里找到完整的代码:https://github.com/marcorigodanzo/gcp_streaming_test/blob/master/my_bigquery_schema.py

我认为这与现在的流式管道有关,谁能告诉我如何在流式管道中执行 bigQuery 写入?

最佳答案

Beam Python 不支持从流式管道写入 BigQuery。现在,您需要使用 Beam Java - 您可以分别使用 PubsubIO.readStrings()BigQueryIO.writeTableRows()

关于python - 从 Pub/Sub 流式传输到 BigQuery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46084361/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com