gpt4 book ai didi

airflow - Cloud Composer 将文件写入其他存储桶问题

转载 作者:行者123 更新时间:2023-12-04 10:53:26 25 4
gpt4 key购买 nike

Airflow 新手。我正在尝试将结果保存到另一个存储桶(不是 Airflow 存储桶)中的文件中。
我可以保存到“/home/airflow/gcs/data/test.json”中的文件,然后使用 gcs_hook.GoogleCloudStorageHook 复制到另一个存储桶。这是代码:

def write_file_func(**context):
file = f'/home/airflow/gcs/data/test.json'
with open(file, 'w') as f:
f.write(json.dumps('{"name":"aaa", "age":"10"}'))
def upload_file_func(**context):
conn = gcs_hook.GoogleCloudStorageHook()
source_bucket = 'source_bucket'
source_object = 'data/test.json'
target_bucket = 'target_bucket'
target_object = 'test.json'
conn.copy(source_bucket, source_object, target_bucket, target_object)
conn.delete(source_bucket, source_object)

我的问题是:
  • 我们可以直接写入目标存储桶的文件吗?我在 gcs_hook 中没有找到任何方法。
  • 我尝试使用google.cloud.storage bucket.blob('test.json').upload_from_string(),但是 Airflow 一直说“服务器的DAGBag中没有DAG”,很烦人,我们不允许在 DAG 中使用该 API?
  • 如果我们可以直接使用 google.cloud.storage/bigquery API,那和 Airflow API 有什么区别,比如 gcs_hook/bigquery_hook?

  • 谢谢

    最佳答案

  • 不,您不能“直接写入目标存储桶中的文件”。要修改存储在 GCS 中的文件,您需要将其下载到本地,进行文件更改,然后将修改后的文件上传回 GCS。有关详细信息,请参阅 [Google 云存储][1] 和 [方法][2]。
  • 我在 Apache Airflow 中成功编译了以下代码。随意使用它。


  • import pip
    import logging
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
    import json
    from datetime import datetime
    def write_file_func():
    file = f'/home/airflow/gcs/data/test.json'
    with open(file, 'w') as f:
    f.write(json.dumps('{"name":"aaa", "age":"10"}'))
    def upload_file_func():
    conn = GoogleCloudStorageHook()
    source_bucket = 'source_bucket'
    source_object = 'data/test.json'
    target_bucket = 'target_bucket'
    target_object = 'test.json'
    conn.copy(source_bucket, source_object, target_bucket, target_object)
    #conn.delete(source_bucket, source_object)
    with DAG('load_gcs_file', description='DAG', schedule_interval=None, start_date=datetime(2018, 11, 1)) as dag:
    create_file = PythonOperator(task_id='create_file', python_callable=write_file_func)
    copy_file = PythonOperator(task_id='copy_file', python_callable=upload_file_func)

    create_file >> copy_file


    注意:-) 请更改 source_bucket 名称值以反射(reflect)您的源存储桶名称。
    -) 请更改 target_bucket 名称值以反射(reflect)您的目标存储桶名称。
  • Airflow Hook 是外部库(例如 google.cloud.storage)的可重用接口(interface),因此许多不同的运算符(operator)可以以一致的方式与这些外部 API 对话。
    一个通用的示例是更新外部库时:不需要在使用外部库的每个地方都更新代码,只需要更改 Hook 代码。
  • 关于airflow - Cloud Composer 将文件写入其他存储桶问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59354598/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com