gpt4 book ai didi

triggers - Apache Airflow - 完成时触发/安排 DAG 重新运行(文件传感器)

转载 作者:行者123 更新时间:2023-12-02 22:06:35 25 4
gpt4 key购买 nike

早上好。

我也在尝试设置 DAG

  1. 监视/感知文件是否命中网络文件夹
  2. 处理文件
  3. 归档文件

使用在线教程和 stackoverflow,我已经能够提出以下成功实现目标的 DAG 和 Operator,但是我希望 DAG 在完成后重新安排或重新运行,以便它开始监视/感知另一个文件.

我尝试设置一个变量max_active_runs:1,然后设置一个schedule_interval: timedelta(seconds=5),这会重新安排DAG,但会开始排队任务并锁定文件.

关于如何在 archive_task 之后重新运行 DAG,有什么想法欢迎吗?

谢谢

DAG 代码

from airflow import DAG
from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator
from datetime import datetime, timedelta
from airflow.models import Variable

default_args = {
'owner': 'glsam',
'depends_on_past': False,
'start_date': datetime.now(),
'provide_context': True,
'retries': 100,
'retry_delay': timedelta(seconds=30),
'max_active_runs': 1,
'schedule_interval': timedelta(seconds=5),
}

dag = DAG('test_sensing_for_a_file', default_args=default_args)

filepath = Variable.get("soucePath_Test")
filepattern = Variable.get("filePattern_Test")
archivepath = Variable.get("archivePath_Test")

sensor_task = OmegaFileSensor(
task_id='file_sensor_task',
filepath=filepath,
filepattern=filepattern,
poke_interval=3,
dag=dag)


def process_file(**context):
file_to_process = context['task_instance'].xcom_pull(
key='file_name', task_ids='file_sensor_task')
file = open(filepath + file_to_process, 'w')
file.write('This is a test\n')
file.write('of processing the file')
file.close()


proccess_task = PythonOperator(
task_id='process_the_file',
python_callable=process_file,
provide_context=True,
dag=dag
)

archive_task = ArchiveFileOperator(
task_id='archive_file',
filepath=filepath,
archivepath=archivepath,
dag=dag)

sensor_task >> proccess_task >> archive_task

文件传感器运算符(operator)

    import os
import re

from datetime import datetime
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from airflow.operators.sensors import BaseSensorOperator


class ArchiveFileOperator(BaseOperator):
@apply_defaults
def __init__(self, filepath, archivepath, *args, **kwargs):
super(ArchiveFileOperator, self).__init__(*args, **kwargs)
self.filepath = filepath
self.archivepath = archivepath

def execute(self, context):
file_name = context['task_instance'].xcom_pull(
'file_sensor_task', key='file_name')
os.rename(self.filepath + file_name, self.archivepath + file_name)


class OmegaFileSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, filepath, filepattern, *args, **kwargs):
super(OmegaFileSensor, self).__init__(*args, **kwargs)
self.filepath = filepath
self.filepattern = filepattern

def poke(self, context):
full_path = self.filepath
file_pattern = re.compile(self.filepattern)

directory = os.listdir(full_path)

for files in directory:
if re.match(file_pattern, files):
context['task_instance'].xcom_push('file_name', files)
return True
return False


class OmegaPlugin(AirflowPlugin):
name = "omega_plugin"
operators = [OmegaFileSensor, ArchiveFileOperator]

最佳答案

德米特里斯方法非常有效。

我还在阅读设置中发现 schedule_interval=None 然后使用 TriggerDagRunOperator 也同样有效

trigger = TriggerDagRunOperator(
task_id='trigger_dag_RBCPV99_rerun',
trigger_dag_id="RBCPV99_v2",
dag=dag)

sensor_task >> proccess_task >> archive_task >> trigger

关于triggers - Apache Airflow - 完成时触发/安排 DAG 重新运行(文件传感器),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44770070/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com