gpt4 book ai didi

python - 如何在 Airflow 中使用 s3 hook

转载 作者:行者123 更新时间:2023-12-03 17:43:43 25 4
gpt4 key购买 nike

我有一个 s3 文件夹位置,我要移动到 GCS。
我正在使用 Airflow 使运动发生。

在这种环境下,我的 s3 是一个“不断增长”的文件夹,这意味着我们在获取文件后不会删除它们。

def GetFiles(**kwargs):
foundfiles = False

s3 = S3Hook(aws_conn_id='S3_BDEX')
s3.get_conn()
bucket = s3.get_bucket(
bucket_name='/file.share.external.bdex.com/Offrs'
)
files = s3.list_prefixes(bucket_name='/file.share.external.bdex.com/Offrs')
print("BUCKET: {}".format(files))


check_for_file = BranchPythonOperator(
task_id='Check_FTP_and_Download',
provide_context=True,
python_callable=GetFiles,
dag=dag
)

我在这里需要的是文件列表及其创建日期/时间。通过这种方式,我可以比较现有文件以确定它们是否是新文件。

我知道我可以连接,因为函数 get_bucket功能起作用了。
但是,在这种情况下,我收到以下错误:
Invalid bucket name "/file.share.external.bdex.com/Offrs": Bucket name must match the regex "^[a-zA-Z0-9.\-_]{1,255}$"

谢谢

最佳答案

  • 存储桶名称错误。如果 url 是 s3://something/path/to/file,则存储桶名称是“something”。
  • 关于python - 如何在 Airflow 中使用 s3 hook,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60201640/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com