gpt4 book ai didi

google-cloud-functions - 从 Cloud Function 的文件到达事件触发 Composer DAG 上的任务

转载 作者:行者123 更新时间:2023-12-05 09:10:23 25 4
gpt4 key购买 nike

我可以从云功能触发 Airflow 任务吗?

基本上我的问题是这样的。我有一些文件到达谷歌云存储。同一个 DAG 中的多个文件。我需要在文件到达时触发转换作业。我在考虑使用云功能。但是我的 DAG 中有很多依赖作业。

感谢任何帮助

最佳答案

您不一定需要 Cloud Function 来感知 GCS 中的文件,Composer 具有可用于实现此目的的 GCS 传感器。

假设您必须监控 bucket/folder/file_*.csv 中的文件,然后:

from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
import datetime as dt
from airflow.models import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator

lasthour = dt.datetime.now() - dt.timedelta(hours=1)

args = {
'owner': 'airflow',
'start_date': lasthour,
'depends_on_past': False,
}
dag = DAG(
dag_id='GCS_sensor_dag',
schedule_interval=None,
default_args=args
)
GCS_File_list = GoogleCloudStorageListOperator(
task_id= 'list_Files',
bucket= 'bucketname',
prefix='folder/file_',
delimiter='.csv',
google_cloud_storage_conn_id='google_cloud_default',
dag = dag
)
file_sensor = GoogleCloudStoragePrefixSensor(
task_id='gcs_polling',
bucket='bucketname',
prefix='folder/file_',
dag=dag
)

trigger = TriggerDagRunOperator(
task_id='trigger_dag_{timestamp}_rerun'.format(timestamp=((dt.datetime.now() - dt.datetime.utcfromtimestamp(0)).total_seconds()*1000)),
trigger_dag_id="GCS_sensor_dag",
dag=dag
)

file_sensor >> GCS_File_list >> trigger

关于google-cloud-functions - 从 Cloud Function 的文件到达事件触发 Composer DAG 上的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61516732/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com