gpt4 book ai didi

python - EMR DAG 在所有步骤完成之前终止

转载 作者:行者123 更新时间:2023-12-05 08:07:49 34 4
gpt4 key购买 nike

我在我的 DAG 中使用 EMR CreateJobFlow、AddSteps、StepSensor 和 TerminateJobFlow 运算符来启动 EMR 集群、添加步骤(2 个 spark 应用程序和 dist-cp),并在所有步骤完成或 1 个失败时终止。当我有一个 2 步 DAG(第一个是 Spark 应用程序,第二个是 dist-cp)时,我能够做到这一点,但是,当我有 2 个 spark 应用程序时,集群成功运行了第一个步骤,并终止而没有继续第二和第三步。

通过一些挖掘,我可以看到 Airflow“戳”步骤以查看它们是否仍在运行。在这种情况下,它似乎认为它只有在完成 1 个步骤时才“成功”。

我的 spark 应用程序相当简单。第一个创建数据帧并将其写入本地 HDFS。第二个从 HDFS 读取数据并加入另一个数据集并写回 HDFS。第三步s3-dist-cp将数据从HDFS复制到s3。所有 3 个步骤都可以在 Spark-Shell 中以交互方式或作为 Spark-Submit 作业成功运行。我还自己克隆了 EMR 集群(没有 Airflow ),看到所有步骤都成功且没有任何错误,因此 EMR 和 Spark 不是这里的问题。

下面是DAG


from datetime import timedelta

import airflow
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator \
import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator \
import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator \
import EmrTerminateJobFlowOperator

DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2)
}

SPARK_TEST_STEPS = [
{
'Name': 'monthly_agg',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit',
'--deploy-mode',
'cluster',
'--class' ,
'AggApp',
's3://jar1.jar' ]
}
},
{
'Name': 'monthly_agg2',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit',
'--deploy-mode',
'cluster',
'--class' ,
'SimpleApp',
's3:/jar2.jar' ]
}
},

{
'Name': 'copy-data',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['s3-dist-cp',
'--src',
'/tempo',
'--dest',
's3://mydata/'
]
}
}
]

JOB_FLOW_OVERRIDES = {
'Instances': {'Ec2SubnetId': 'subnet-mysubnetid',
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'r4.2xlarge',
'InstanceCount': 1
},
{
'Name': 'Slave nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'r4.2xlarge',
'InstanceCount': 8,
'EbsConfiguration': {'EbsBlockDeviceConfigs':[{'VolumeSpecification':{'SizeInGB':128,'VolumeType':'gp2'},'VolumesPerInstance':1}],'EbsOptimized':True}
}
]},
'Name':'airflow-monthly_agg_custom',
'Configurations': [
{
'Classification':'spark-defaults','Properties':
{'spark.sql.crossJoin.enabled':'true',
'spark.serializer':'org.apache.spark.serializer.KryoSerializer',
'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version':'2',
"maximizeResourceAllocation":"true"},
'Configurations':[]
},
{
'Classification':'spark-hive-site','Properties':
{'hive.metastore.client.factory.class':'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'},
'Configurations':[]
}
]}

dag = DAG(
'monthly_agg_custom',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=4),
schedule_interval='@once'
)

cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
dag=dag
)

step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_TEST_STEPS,
dag=dag
)

step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)

cluster_creator.set_downstream(step_adder)
step_adder.set_downstream(step_checker)
step_checker.set_downstream(cluster_remover)

最佳答案

问题是您将 EmrStepSensor 所有步骤作为一个 stepadder 提供,因此一旦完成,它就会终止集群。

解决方案是将所有步骤分开,并将最后一步的'id 赋予EmrStepSensor。或者,您可以仅将最后一步与其他步骤的单独步骤加法器 (step_adder_actual_step) 一起提供给 EmrStepSensor

step_adder_pre_step = EmrAddStepsOperator(
task_id='pre_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
aws_conn_id='aws_default',
steps=pre_step,
dag=dag
)

step_adder_actual_step = EmrAddStepsOperator(
task_id='actual_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
aws_conn_id='aws_default',
steps=actual_step,
dag=dag
)

step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('actual_step', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)

cluster_creator >> step_adder_pre_step >> step_adder_actual_step >> step_checker >> cluster_remover

关于python - EMR DAG 在所有步骤完成之前终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53048106/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com