gpt4 book ai didi

google-bigquery - 将数据从 Google Storage 加载到 BigQuery 时如何执行 UPSERT?

转载 作者:行者123 更新时间:2023-12-01 15:37:55 30 4
gpt4 key购买 nike

BigQuery 支持以下策略:
WRITE_APPEND - 指定可以将行附加到现有表中。
WRITE_EMPTY - 指定输出表必须为空。
WRITE_TRUNCATE - 指定写入应替换表。

它们都不适合 UPSERT 的目的手术。

我正在将订单 Json 文件导入 Google Storage,我想将其加载到 BigQuery 中。按照逻辑,一些记录是新的,而其他记录已经存在于以前的加载中并且需要更新(例如更新订单状态(新的/暂停/发送/退款等...)

我正在使用 Airflow,但我的问题很笼统:

update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_orders_to_BigQuery',
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template,
source_format='NEWLINE_DELIMITED_JSON',
source_objects=[gcs_export_uri_template],
schema_fields=dc(),
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
skip_leading_rows = 1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)

此代码使用 WRITE_TRUNCATE这意味着它删除整个表并加载请求的文件。

如何修改以支持 UPSERT ?

我唯一的选择是查询表搜索以查找出现在 json 中的现有订单删除它们然后执行 LOAD ?

最佳答案

而不是运行 GoogleCloudStorageToBigQueryOperator ,您可以运行一个查询,该查询将为您提供与 upsert 相同的结果。

来自 https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#merge_statement 的示例:

MERGE dataset.Inventory T
USING dataset.NewArrivals S
ON T.product = S.product
WHEN MATCHED THEN
UPDATE SET quantity = T.quantity + S.quantity
WHEN NOT MATCHED THEN
INSERT (product, quantity) VALUES(product, quantity)

此查询将:
  • 查看表 T(当前)和 S(更新)。
  • 如果更新更改了现有行,它将运行 UPDATE在那一行。
  • 如果更新有一个尚不存在的产品,它将 INSERT那个新行。

  • 现在,BigQuery 如何知道您的表 S ?您可以:
  • 使用 GoogleCloudStorageToBigQueryOperator 将其加载到 BQ 到不同的表中.
  • 或者,您可以设置一个直接查看 GCS 的联合表 - 我在 https://medium.com/google-cloud/bigquery-lazy-data-loading-ddl-dml-partitions-and-half-a-trillion-wikipedia-pageviews-cd3eacd657b6 中这样做了
  • 关于google-bigquery - 将数据从 Google Storage 加载到 BigQuery 时如何执行 UPSERT?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51962650/

    30 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com