gpt4 book ai didi

google-bigquery - 如何最好地处理存储在 Google BigQuery 中不同位置的数据?

转载 作者:行者123 更新时间:2023-12-02 17:50:58 24 4
gpt4 key购买 nike

我当前在 BigQuery 中的工作流程如下:

(1) 查询公共(public)存储库中的数据(存储在美国),(2) 将其写入我的存储库中的表中,(3) 将 csv 导出到云存储桶,以及 (4) 将 csv 下载到我工作的服务器并 (5) 在服务器上使用该服务器。

我现在遇到的问题是我工作的服务器位于欧盟。因此,我必须支付相当多的费用才能在我的美国存储桶和我的欧盟服务器之间传输数据。我现在可以继续在欧盟找到我的存储桶,但我仍然遇到将数据从美国 (BigQuery) 传输到欧盟(存储桶)的问题。所以我也可以将bq中的数据集设置为位于欧盟,但这样我就无法再进行任何查询,因为公共(public)存储库中的数据位于美国,并且不允许在不同位置之间查询。

有人知道如何解决这个问题吗?

最佳答案

将 BigQuery 数据集从一个区域复制到另一个区域的一种方法是利用 Storage Data Transfer Service 。它并没有回避这样一个事实:您仍然必须 pay for bucket-to-bucket network traffic ,但可能会节省您将数据复制到欧盟服务器的 CPU 时间。

流程将是:

  1. 将所有 BigQuery 表提取到与表位于同一区域的存储桶中。 (推荐 Avro 格式,以获得最佳的数据类型保真度和最快的加载速度。)
  2. 运行存储传输作业​​,将提取的文件从起始位置存储桶复制到目标位置的存储桶。
  3. 将所有文件加载到位于目标位置的 BigQuery 数据集。

Python 示例:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import sys
import time

import googleapiclient.discovery
from google.cloud import bigquery
import json
import pytz


PROJECT_ID = 'swast-scratch' # TODO: set this to your project name
FROM_LOCATION = 'US' # TODO: set this to the BigQuery location
FROM_DATASET = 'workflow_test_us' # TODO: set to BQ dataset name
FROM_BUCKET = 'swast-scratch-us' # TODO: set to bucket name in same location
TO_LOCATION = 'EU' # TODO: set this to the destination BigQuery location
TO_DATASET = 'workflow_test_eu' # TODO: set to destination dataset name
TO_BUCKET = 'swast-scratch-eu' # TODO: set to bucket name in destination loc

# Construct API clients.
bq_client = bigquery.Client(project=PROJECT_ID)
transfer_client = googleapiclient.discovery.build('storagetransfer', 'v1')


def extract_tables():
# Extract all tables in a dataset to a Cloud Storage bucket.
print('Extracting {}:{} to bucket {}'.format(
PROJECT_ID, FROM_DATASET, FROM_BUCKET))

tables = list(bq_client.list_tables(bq_client.dataset(FROM_DATASET)))
extract_jobs = []
for table in tables:
job_config = bigquery.ExtractJobConfig()
job_config.destination_format = bigquery.DestinationFormat.AVRO
extract_job = bq_client.extract_table(
table.reference,
['gs://{}/{}.avro'.format(FROM_BUCKET, table.table_id)],
location=FROM_LOCATION, # Available in 0.32.0 library.
job_config=job_config) # Starts the extract job.
extract_jobs.append(extract_job)

for job in extract_jobs:
job.result()

return tables


def transfer_buckets():
# Transfer files from one region to another using storage transfer service.
print('Transferring bucket {} to {}'.format(FROM_BUCKET, TO_BUCKET))
now = datetime.datetime.now(pytz.utc)
transfer_job = {
'description': '{}-{}-{}_once'.format(
PROJECT_ID, FROM_BUCKET, TO_BUCKET),
'status': 'ENABLED',
'projectId': PROJECT_ID,
'transferSpec': {
'transferOptions': {
'overwriteObjectsAlreadyExistingInSink': True,
},
'gcsDataSource': {
'bucketName': FROM_BUCKET,
},
'gcsDataSink': {
'bucketName': TO_BUCKET,
},
},
# Set start and end date to today (UTC) without a time part to start
# the job immediately.
'schedule': {
'scheduleStartDate': {
'year': now.year,
'month': now.month,
'day': now.day,
},
'scheduleEndDate': {
'year': now.year,
'month': now.month,
'day': now.day,
},
},
}
transfer_job = transfer_client.transferJobs().create(
body=transfer_job).execute()
print('Returned transferJob: {}'.format(
json.dumps(transfer_job, indent=4)))

# Find the operation created for the job.
job_filter = {
'project_id': PROJECT_ID,
'job_names': [transfer_job['name']],
}

# Wait until the operation has started.
response = {}
while ('operations' not in response) or (not response['operations']):
time.sleep(1)
response = transfer_client.transferOperations().list(
name='transferOperations', filter=json.dumps(job_filter)).execute()

operation = response['operations'][0]
print('Returned transferOperation: {}'.format(
json.dumps(operation, indent=4)))

# Wait for the transfer to complete.
print('Waiting ', end='')
while operation['metadata']['status'] == 'IN_PROGRESS':
print('.', end='')
sys.stdout.flush()
time.sleep(5)
operation = transfer_client.transferOperations().get(
name=operation['name']).execute()
print()

print('Finished transferOperation: {}'.format(
json.dumps(operation, indent=4)))


def load_tables(tables):
# Load all tables into the new dataset.
print('Loading tables from bucket {} to {}:{}'.format(
TO_BUCKET, PROJECT_ID, TO_DATASET))

load_jobs = []
for table in tables:
dest_table = bq_client.dataset(TO_DATASET).table(table.table_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.AVRO
load_job = bq_client.load_table_from_uri(
['gs://{}/{}.avro'.format(TO_BUCKET, table.table_id)],
dest_table,
location=TO_LOCATION, # Available in 0.32.0 library.
job_config=job_config) # Starts the load job.
load_jobs.append(load_job)

for job in load_jobs:
job.result()


# Actually run the script.
tables = extract_tables()
transfer_buckets()
load_tables(tables)

前面的示例使用 google-cloud-bigquery 库来实现 BigQuery API,并使用 google-api-python-client 来实现存储数据传输 API。

请注意,此示例不考虑分区表。

关于google-bigquery - 如何最好地处理存储在 Google BigQuery 中不同位置的数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32767245/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com