gpt4 book ai didi

Airflow 并行运行任务

转载 作者:行者123 更新时间:2023-12-04 04:16:44 25 4
gpt4 key购买 nike

我很困惑如何并行运行 2 个任务的工作 Airflow 。

这是我的达格:

import datetime as dt
from airflow import DAG
import os
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dagrun_operator import TriggerDagRunOperator

scriptAirflow = '/home/alexw/scriptAirflow/'
uploadPath='/apps/man-data/data/to_load/'
receiptPath= '/apps/man-data/data/to_receipt/'

def result():
if(os.listdir(receiptPath)):
for files in os.listdir(receiptPath):
if files.startswith('MEM') and files.endswith('.csv'):
return 'mem_script'
pass
print('Launching script for: '+files)
elif files.startswith('FMS') and files.endswith('.csv'):
return 'fms_script'
pass
else:
pass
else:
print('No script to launch')
return "no_script"
pass

def onlyCsvFiles():
if(os.listdir(uploadPath)):
for files in os.listdir(uploadPath):
if files.startswith('MEM') or files.startswith('FMS') and files.endswith('.csv'):
return 'move_good_file'
else:
return 'move_bad_file'
else:
pass

default_args = {
'owner': 'testingA',
'start_date': dt.datetime(2020, 2, 17),
'retries': 1,
}

dag = DAG('tryingAirflow', default_args=default_args, description='airflow20',
schedule_interval=None, catchup=False)

file_sensor = FileSensor(
task_id="file_sensor",
filepath=uploadPath,
fs_conn_id='airflow_db',
poke_interval=10,
dag=dag,
)

onlyCsvFiles=BranchPythonOperator(
task_id='only_csv_files',
python_callable=onlyCsvFiles,
trigger_rule='none_failed',
dag=dag,)

move_good_file = BashOperator(
task_id="move_good_file",
bash_command='python3 '+scriptAirflow+'movingGoodFiles.py "{{ execution_date }}"',
dag=dag,
)
move_bad_file = BashOperator(
task_id="move_bad_file",
bash_command='python3 '+scriptAirflow+'movingBadFiles.py "{{ execution_date }}"',
dag=dag,
)
result_mv = BranchPythonOperator(
task_id='result_mv',
python_callable=result,
trigger_rule='none_failed',
dag=dag,
)
run_Mem_Script = BashOperator(
task_id="mem_script",
bash_command='python3 '+scriptAirflow+'memShScript.py "{{ execution_date }}"',
dag=dag,
)
run_Fms_Script = BashOperator(
task_id="fms_script",
bash_command='python3 '+scriptAirflow+'fmsScript.py "{{ execution_date }}"',
dag=dag,
)
skip_script= BashOperator(
task_id="no_script",
bash_command="echo No script to launch",
dag=dag,
)

rerun_dag=TriggerDagRunOperator(
task_id='rerun_dag',
trigger_dag_id='tryingAirflow',
trigger_rule='none_failed',
dag=dag,
)

onlyCsvFiles.set_upstream(file_sensor)
onlyCsvFiles.set_upstream(file_sensor)
move_good_file.set_upstream(onlyCsvFiles)
move_bad_file.set_upstream(onlyCsvFiles)
result_mv.set_upstream(move_good_file)
result_mv.set_upstream(move_bad_file)
run_Fms_Script.set_upstream(result_mv)
run_Mem_Script.set_upstream(result_mv)
skip_script.set_upstream(result_mv)
rerun_dag.set_upstream(run_Fms_Script)
rerun_dag.set_upstream(run_Mem_Script)
rerun_dag.set_upstream(skip_script)

当它来选择任务的结果时,如果我必须调用两个,它只会执行一个任务并跳过另一个任务。

我想在必要时同时执行这两个任务。对于我的 Airflow .cfg。问题是:如何使用 BranchPythonOperator 并行运行任务(如果不需要,则不运行)。

enter image description here
谢谢你的帮助!

最佳答案

如果您想确定运行 两者 脚本或 我会在需要并行运行的两个任务之前添加一个虚拟任务。当您使用 BranchPythonOperator 时,Airflow 将始终选择一个分支来执行.

我会做出这些改变:

# import the DummyOperator
from airflow.operators.dummy_operator import DummyOperator

# modify the returns of the function result()
def result():
if(os.listdir(receiptPath)):
for files in os.listdir(receiptPath):
if (files.startswith('MEM') and files.endswith('.csv') or
files.startswith('FMS') and files.endswith('.csv')):
return 'run_scripts'
else:
print('No script to launch')
return "no_script"

# add the dummy task
run_scripts = DummyOperator(
task_id="run_scripts",
dag=dag
)

# add dependency
run_scripts.set_upstream(result_mv)

# CHANGE two of the dependencies to
run_Fms_Script.set_upstream(run_scripts)
run_Mem_Script.set_upstream(run_scripts)

我必须承认我从未与 LocalExecutor 合作过处理并行任务,但这应该确保您运行这两个任务,以防您想运行脚本。

编辑:

如果你想不运行,两者之一,或者两者都运行,我认为最简单的方法是创建另一个在 bash 中并行运行两个脚本的任务(或者至少它与 & 一起运行它们)。我会做这样的事情:
# import the DummyOperator
from airflow.operators.dummy_operator import DummyOperator

# modify the returns of the function result() so that it chooses between 4 different outcomes
def result():
if(os.listdir(receiptPath)):
mem_flag = False
fms_flag = False
for files in os.listdir(receiptPath):
if (files.startswith('MEM') and files.endswith('.csv')):
mem_flag = True
if (files.startswith('FMS') and files.endswith('.csv')):
fms_flag = True
if mem_flag and fms_flag:
return "both_scripts"
elif mem_flag:
return "mem_script"
elif fms_flag:
return "fms_script"
else:
return "no_script"
else:
print('No script to launch')
return "no_script"

# add the 'run both scripts' task
run_both_scripts = BashOperator(
task_id="both_script",
bash_command='python3 '+scriptAirflow+'memShScript.py "{{ execution_date }}" & python3 '+scriptAirflow+'fmsScript.py "{{ execution_date }}" &',
dag=dag,
)

# add dependency
run_both_scripts.set_upstream(result_mv)

关于Airflow 并行运行任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60603855/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com