gpt4 book ai didi

python - Airflow EMR 从传感器执行步骤

转载 作者:太空宇宙 更新时间:2023-11-03 14:04:20 24 4
gpt4 key购买 nike

我在 Airflow 中制作了以下 DAG,我正在执行一组 EMRSteps 来运行我的管道。

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 07, 20, 10, 00),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=2),
}

dag = DAG('dag_import_match_hourly',
default_args=default_args,
description='Fancy Description',
schedule_interval=timedelta(hours=1),
dagrun_timeout=timedelta(hours=2))

try:
merge_s3_match_step = EmrAddStepsOperator(
task_id='merge_s3_match_step',
job_flow_id=cluster_id,
aws_conn_id='aws_default',
steps=create_step('Merge S3 Match'),
dag=dag
)

mapreduce_step = EmrAddStepsOperator(
task_id='mapreduce_match_step',
job_flow_id=cluster_id,
aws_conn_id='aws_default',
steps=create_step('MapReduce Match Hourly'),
dag=dag
)

merge_hdfs_step = EmrAddStepsOperator(
task_id='merge_hdfs_step',
job_flow_id=cluster_id,
aws_conn_id='aws_default',
steps=create_step('Merge HDFS Match Hourly'),
dag=dag
)

## Sensors
check_merge_s3 = EmrStepSensor(
task_id='watch_merge_s3',
job_flow_id=cluster_id,
step_id="{{ task_instance.xcom_pull('merge_s3_match_step', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)

check_mapreduce = EmrStepSensor(
task_id='watch_mapreduce',
job_flow_id=cluster_id,
step_id="{{ task_instance.xcom_pull('mapreduce_match_step', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)

check_merge_hdfs = EmrStepSensor(
task_id='watch_merge_hdfs',
job_flow_id=cluster_id,
step_id="{{ task_instance.xcom_pull('merge_hdfs_step', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)

mapreduce_step.set_upstream(merge_s3_match_step)
merge_s3_match_step.set_downstream(check_merge_s3)

mapreduce_step.set_downstream(check_mapreduce)

merge_hdfs_step.set_upstream(mapreduce_step)
merge_hdfs_step.set_downstream(check_merge_hdfs)

except AirflowException as ae:
print ae.message

DAG 工作正常,但我想使用传感器来确保当且仅当 EMR 作业已正确完成时我才会执行下一步。我尝试了一些东西,但都没有用。上面的代码不能正确完成工作。有人知道如何使用 EMRSensorStep 来实现我的目标吗?

最佳答案

看起来你的EmrStepSensor任务需要设置正确的依赖,例如check_mapreduce,如果你想等待check_mapreduce完成,下一步应该是merge_hdfs_step.set_upstream(check_mapreduce)或者check_mapreduce.set_downstream(merge_hdfs_step)。所以应该是TaskA>>SensorA>>TaskB>>SensorB>>TaskC>>SensorC,尝试用这种方式设置依赖

关于python - Airflow EMR 从传感器执行步骤,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45266261/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com