gpt4 book ai didi

python - 尝试根据文件名数组从父 dag 创建动态 subdag

转载 作者:行者123 更新时间:2023-12-03 15:13:48 25 4
gpt4 key购买 nike

我正在尝试使用 Airflow 将 s3 文件从“非删除”存储桶(意味着我无法删除文件)移动到 GCS。我不能保证每天都会有新文件,但我必须每天检查新文件。

我的问题是 subdag 的动态创建。如果有文件,我需要 subdags。如果没有文件,我不需要子标签。我的问题是上游/下游设置。在我的代码中,它确实会检测文件,但不会像预期的那样启动 subdag。我错过了一些东西。

这是我的代码:

from airflow import models
from airflow.utils.helpers import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.utils import dates
from airflow.models import Variable
import logging

args = {
'owner': 'Airflow',
'start_date': dates.days_ago(1),
'email': ['sinistersparrow1701@gmail.com'],
'email_on_failure': True,
'email_on_success': True,
}

bucket = 'mybucket'
prefix = 'myprefix/'
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = []

parent_dag = models.DAG(
dag_id='My_Ingestion',
default_args=args,
schedule_interval='@daily',
catchup=False
)

def Check_For_Files(**kwargs):
s3 = S3Hook(aws_conn_id='S3_BOX')
s3.get_conn()
bucket = bucket
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = s3.list_keys(bucket_name=bucket, prefix='myprefix/file')
for file in files:
print(file)
print(file.split("_")[-2])
print(file.split("_")[-2][-8:]) ##proves I can see a date in the file name is ok.
maxdate = maxdate if maxdate > int(file.split("_")[-2][-8:]) else int(file.split("_")[-2][-8:])
if maxdate > LastBDEXDate:
return 'Start_Process'
return 'finished'

def create_subdag(dag_parent, dag_id_child_prefix, file_name):
# dag params
dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix)

# dag
subdag = models.DAG(dag_id=dag_id_child,
default_args=args,
schedule_interval=None)

# operators
s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
task_id=dag_id_child,
bucket=bucket,
prefix=file_name,
dest_gcs_conn_id='GCP_Account',
dest_gcs='gs://my_files/To_Process/',
replace=False,
gzip=True,
dag=subdag)


return subdag

def create_subdag_operator(dag_parent, filename, index):
tid_subdag = 'file_{}'.format(index)
subdag = create_subdag(dag_parent, tid_subdag, filename)
sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
return sd_op

def create_subdag_operators(dag_parent, file_list):
subdags = [create_subdag_operator(dag_parent, file, file_list.index(file)) for file in file_list]
# chain subdag-operators together
chain(*subdags)
return subdags

check_for_files = BranchPythonOperator(
task_id='Check_for_s3_Files',
provide_context=True,
python_callable=Check_For_Files,
dag=parent_dag
)

finished = DummyOperator(
task_id='finished',
dag=parent_dag
)

decision_to_continue = DummyOperator(
task_id='Start_Process',
dag=parent_dag
)

if len(files) > 0:
subdag_ops = create_subdag_operators(parent_dag, files)
check_for_files >> decision_to_continue >> subdag_ops[0] >> subdag_ops[-1] >> finished


check_for_files >> finished

最佳答案

下面是在 Airflow 中创建动态 DAG 或子 DAG 的推荐方法,尽管还有其他方法,但我想这在很大程度上适用于您的问题。

首先,创建一个文件(yaml/csv)其中包括所有 s3 的列表文件和位置,在您的情况下,您编写了一个函数将它们存储在列表中,我会说将它们存储在单独的 yaml 中。文件并在运行时在 Airflow 环境中加载它,然后创建 DAG。

以下是样本 yaml文件:dynamicDagConfigFile.yaml

job: dynamic-dag
bucket_name: 'bucket-name'
prefix: 'bucket-prefix'
S3Files:
- File1: 'S3Loc1'
- File2: 'S3Loc2'
- File3: 'S3Loc3'

您可以修改您的 Check_For_Files函数将它们存储在 yaml 中文件。

现在我们可以继续动态创建 dag:

首先使用虚拟运算符定义两个任务,即开始和结束任务。这些任务是我们将在 DAG 上构建的任务。通过在它们之间动态创建任务:
start = DummyOperator(
task_id='start',
dag=dag
)

end = DummyOperator(
task_id='end',
dag=dag)

动态 DAG:
我们将使用 PythonOperators在 Airflow 中。该函数应接收任务 id 作为参数;要执行的python函数,即python运算符的python_callable;和一组要在执行过程中使用的参数。

包括参数 task id .因此,我们可以在以动态方式生成的任务之间交换数据,例如,通过 XCOM .

您可以在此动态 dag 中指定您的操作功能,例如 s3_to_gcs_op .
def createDynamicDAG(task_id, callableFunction, args):
task = PythonOperator(
task_id = task_id,
provide_context=True,
#Eval is used since the callableFunction var is of type string
#while the python_callable argument for PythonOperators only receives objects of type callable not strings.
python_callable = eval(callableFunction),
op_kwargs = args,
xcom_push = True,
dag = dag,
)
return task

最后,根据 yaml 文件中存在的位置,您可以创建动态 dag,首先阅读 yaml文件如下并创建动态 dag:
with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
# use safe_load instead to load the YAML file
configFile = yaml.safe_load(f)

#Extract file list
S3Files = configFile['S3Files']

#In this loop tasks are created for each table defined in the YAML file
for S3File in S3Files:
for S3File, fieldName in S3File.items():

#Remember task id is provided in order to exchange data among tasks generated in dynamic way.
get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File),
'getS3Data',
{}) #your configs here.

#Second step is upload S3 to GCS
upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})

#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.


最终 DAG 定义:

这个想法是
#once tasks are generated they should linked with the
#dummy operators generated in the start and end tasks.
start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end

完整的 Airflow 代码按顺序:
import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

start = DummyOperator(
task_id='start',
dag=dag
)


def createDynamicDAG(task_id, callableFunction, args):
task = PythonOperator(
task_id = task_id,
provide_context=True,
#Eval is used since the callableFunction var is of type string
#while the python_callable argument for PythonOperators only receives objects of type callable not strings.
python_callable = eval(callableFunction),
op_kwargs = args,
xcom_push = True,
dag = dag,
)
return task


end = DummyOperator(
task_id='end',
dag=dag)



with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
configFile = yaml.safe_load(f)

#Extract file list
S3Files = configFile['S3Files']

#In this loop tasks are created for each table defined in the YAML file
for S3File in S3Files:
for S3File, fieldName in S3File.items():

#Remember task id is provided in order to exchange data among tasks generated in dynamic way.
get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File),
'getS3Data',
{}) #your configs here.

#Second step is upload S3 to GCS
upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})

#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.


start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end

关于python - 尝试根据文件名数组从父 dag 创建动态 subdag,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60270233/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com