gpt4 book ai didi

python - 如何运行 BigQuery 查询,然后将输出 CSV 发送到 Apache Airflow 中的 Google Cloud Storage?

转载 作者:行者123 更新时间:2023-12-03 23:10:29 33 4
gpt4 key购买 nike

我需要在 python 中运行一个 bigquery 脚本,它需要在谷歌云存储中输出为 CSV。目前,我的脚本会触发大查询代码并直接保存到我的 PC。

但是,我需要让它在 Airflow 中运行,所以我不能有任何本地依赖项。

我当前的脚本将输出保存到我​​的本地机器,然后我必须将它移动到 GCS。网上查了一下,没搞明白。 (ps我对python很陌生,所以如果之前有人问过这个问题,我很抱歉!)

import pandas as pd
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials

def run_script():

df = pd.read_gbq('SELECT * FROM `table/veiw` LIMIT 15000',
project_id='PROJECT',
dialect='standard'
)

df.to_csv('XXX.csv', index=False)

def copy_to_gcs(filename, bucket, destination_filename):

credentials = GoogleCredentials.get_application_default()
service = discovery.build('storage', 'v1', credentials=credentials)

body = {'name': destination_filename}
req = service.objects().insert(bucket=bucket,body=body, media_body=filename)
resp = req.execute()

current_date = datetime.date.today()
filename = (r"C:\Users\LOCALDRIVE\ETC\ETC\ETC.csv")
bucket = 'My GCS BUCKET'

str_prefix_datetime = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
destfile = 'XXX' + str_prefix_datetime + '.csv'
print('')

```

最佳答案

Airflow 提供了多种用于 BigQuery 的运算符。

  • BigQueryOperator在 BigQuery 上执行查询。
  • BigQueryToCloudStorageOperator将 BigQuery 表(例如查询的目标表)导出到 GCS。

  • 你可以看到一个运行查询的例子,后面跟着 exporting the results to a CSV in the Cloud Composer code samples .
    # Copyright 2018 Google LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    # https://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    # Query recent StackOverflow questions.

    bq_recent_questions_query = bigquery_operator.BigQueryOperator(
    task_id='bq_recent_questions_query',
    sql="""
    SELECT owner_display_name, title, view_count
    FROM `bigquery-public-data.stackoverflow.posts_questions`
    WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)
    AND creation_date >= CAST('{min_date}' AS TIMESTAMP)
    ORDER BY view_count DESC
    LIMIT 100
    """.format(max_date=max_query_date, min_date=min_query_date),
    use_legacy_sql=False,
    destination_dataset_table=bq_recent_questions_table_id)

    # Export query result to Cloud Storage.
    export_questions_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator(
    task_id='export_recent_questions_to_gcs',
    source_project_dataset_table=bq_recent_questions_table_id,
    destination_cloud_storage_uris=[output_file],
    export_format='CSV')

    关于python - 如何运行 BigQuery 查询,然后将输出 CSV 发送到 Apache Airflow 中的 Google Cloud Storage?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58456094/

    33 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com