gpt4 book ai didi

java - 监控 AWS EMR 作业运行进度的最佳实践是什么?

转载 作者:搜寻专家 更新时间:2023-10-31 20:11:03 25 4
gpt4 key购买 nike

我有以下代码来运行 EMR 作业,并且运行成功。而且我还想监控运行状态。我使用 DescribeJobFlows API,但它表示根据 http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/AmazonElasticMapReduceClient.html 不推荐使用此 API .

谁能帮忙介绍一下监控 EMR 运行进度的最佳做法是什么?

public class EmrJobRunner {
public static void main(String[] args) {
// args is [input_file_path, output_directory], make sure output_directory does not exist
String inputFilePath = "s3://mybucket/emr/input";
String outputDirectory = "s3://mybucket/emr/output/" + System.currentTimeMillis();
String jarName = "WordCount.jar";
String jarPath = "s3://mybucket/emr/" + jarName;
String logPath = "s3://mybucket/emr/logs";

String TERMINATE_JOB_FLOW = "TERMINATE_JOB_FLOW";
String CONTINUE = "CONTINUE";

AWSCredentials credentials = new BasicAWSCredentials("pub_key", "sec_key");
StepFactory stepFactory = new StepFactory();

AmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials);
emr.setRegion(Region.getRegion(Regions.AP_SOUTHEAST_1));

StepConfig enableDebugging = new StepConfig()
.withName("Enable debugging")
.withActionOnFailure(TERMINATE_JOB_FLOW)
.withHadoopJarStep(stepFactory.newEnableDebuggingStep());

StepConfig installHive = new StepConfig()
.withName("Install Hive")
.withActionOnFailure(TERMINATE_JOB_FLOW)
.withHadoopJarStep(stepFactory.newInstallHiveStep());

StepConfig runScript = new StepConfig()
.withName("Run Script")
.withActionOnFailure(CONTINUE)
.withHadoopJarStep(stepFactory.newRunHiveScriptStep("s3://dummy/dummy.hive"));

List<String> jarArgs = Arrays.asList(inputFilePath, outputDirectory);
HadoopJarStepConfig jarCfg= new HadoopJarStepConfig()
.withJar(jarPath)
.withArgs(jarArgs);
StepConfig runJar = new StepConfig()
.withName(jarName)
.withActionOnFailure(TERMINATE_JOB_FLOW)
.withHadoopJarStep(jarCfg);

JobFlowInstancesConfig instanceCfg = new JobFlowInstancesConfig()
.withKeepJobFlowAliveWhenNoSteps(false)
.withTerminationProtected(true)
.withInstanceCount(3)
.withMasterInstanceType(InstanceType.C1Medium.toString())
.withSlaveInstanceType(InstanceType.C1Medium.toString())
.withHadoopVersion("2.4.0");

List<StepConfig> steps = Arrays.asList(enableDebugging, installHive, runScript, runJar);

RunJobFlowRequest request = new RunJobFlowRequest()
.withName("My EMR Job Flow")
.withAmiVersion("3.3.2")
.withInstances(instanceCfg)
.withLogUri(logPath);
.withSteps(steps);

RunJobFlowResult result = emr.runJobFlow(request);
// saying DescribeJobFlows is deprecated
// DescribeJobFlowsResult jobFlowDescResult = emr.DescribeJobFlows(DescribeJobFlowsRequest describeJobFlowsRequest);
}

}

最佳答案

由于 DescribeJobFlows 已弃用,监视集群状态是监视作业运行进度的替代方法。

    RunJobFlowResult runJobResult = emr.runJobFlow(runJobFlowRequest);
System.out.printf("Run JobFlowId is: %s\n", runJobResult.getJobFlowId());

while(true) {
DescribeClusterRequest desc = new DescribeClusterRequest()
.withClusterId(runJobResult.getJobFlowId());
DescribeClusterResult clusterResult = emr.describeCluster(desc);
Cluster cluster = clusterResult.getCluster();
String status = cluster.getStatus().getState();
System.out.printf("Status: %s\n", status);
if(status.equals(ClusterState.TERMINATED.toString()) || status.equals(ClusterState.TERMINATED_WITH_ERRORS.toString())) {
break;
}
try {
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
// maybe other handle
}

关于java - 监控 AWS EMR 作业运行进度的最佳实践是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28364723/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com