gpt4 book ai didi

airflow - Airflow 动态DAG和任务ID

转载 作者:行者123 更新时间:2023-12-03 12:34:52 32 4
gpt4 key购买 nike

我主要看到Airflow被用于ETL / Bid数据相关的工作。我正在尝试将其用于业务工作流,其中用户操作将来会触发一组相关任务。其中某些任务可能需要根据某些其他用户操作来清除(删除)。
我认为最好的方法是通过动态任务ID。我读到Airflow支持动态dag ID。因此,我创建了一个简单的python脚本,该脚本以DAG id和task id作为命令行参数。但是,我遇到了使其无法正常工作的问题。它给出了dag_id not found错误。有人尝试过吗?这是脚本的代码(称为tmp.py),我在命令行上以python(python tmp.py 820 2016-08-24T22:50:00)执行:

from __future__ import print_function
import os
import sys
import shutil
from datetime import date, datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
execution = '2016-08-24T22:20:00'
if len(sys.argv) > 2 :
dagid = sys.argv[1]
taskid = 'Activate' + sys.argv[1]
execution = sys.argv[2]
else:
dagid = 'DAGObjectId'
taskid = 'Activate'
default_args = {'owner' : 'airflow', 'depends_on_past': False, 'start_date':date.today(), 'email': ['fake@fake.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1}
dag = DAG(dag_id = dagid,
default_args=default_args,
schedule_interval='@once',
)
globals()[dagid] = dag
task1 = BashOperator(
task_id = taskid,
bash_command='ls -l',
dag=dag)

fakeTask = BashOperator(
task_id = 'fakeTask',
bash_command='sleep 5',
retries = 3,
dag=dag)
task1.set_upstream(fakeTask)

airflowcmd = "airflow run " + dagid + " " + taskid + " " + execution
print("airflowcmd = " + airflowcmd)
os.system(airflowcmd)

最佳答案

经过无数次的尝试和错误,我得以弄清楚。希望它将对某人有所帮助。它的工作方式如下:您需要具有迭代器或外部源(文件/数据库表),才能通过模板动态生成dag /任务。您可以将dag和任务名称保持静态,仅动态分配它们的ID,以区分一个dag和另一个dag。您将此python脚本放在dags文件夹中。启动气流调度程序时,它将在每个心跳上运行此脚本,并将DAG写入数据库中的dag表。如果已经写入了一个dag(唯一的dag id),它将简单地跳过它。调度程序还会查看各个DAG的调度,以确定哪些DAG已准备好执行。如果DAG准备好执行,则将其执行并更新其状态。
这是一个示例代码:

from airflow.operators import PythonOperator
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta
import sys
import time

dagid = 'DA' + str(int(time.time()))
taskid = 'TA' + str(int(time.time()))

input_file = '/home/directory/airflow/textfile_for_dagids_and_schedule'

def my_sleeping_function(random_base):
'''This is a function that will run within the DAG execution'''
time.sleep(random_base)

def_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(), 'email_on_failure': False,
'retries': 1, 'retry_delay': timedelta(minutes=2)
}
with open(input_file,'r') as f:
for line in f:
args = line.strip().split(',')
if len(args) < 6:
continue
dagid = 'DAA' + args[0]
taskid = 'TAA' + args[0]
yyyy = int(args[1])
mm = int(args[2])
dd = int(args[3])
hh = int(args[4])
mins = int(args[5])
ss = int(args[6])
dag = DAG(
dag_id=dagid, default_args=def_args,
schedule_interval='@once', start_date=datetime(yyyy,mm,dd,hh,mins,ss)
)

myBashTask = BashOperator(
task_id=taskid,
bash_command='python /home/directory/airflow/sendemail.py',
dag=dag)

task2id = taskid + '-X'

task_sleep = PythonOperator(
task_id=task2id,
python_callable=my_sleeping_function,
op_kwargs={'random_base': 10},
dag=dag)

task_sleep.set_upstream(myBashTask)

f.close()

关于airflow - Airflow 动态DAG和任务ID,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39133376/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com