gpt4 book ai didi

java - 从 Java SDK 运行 Amazon EMR 作业

转载 作者:搜寻专家 更新时间:2023-11-01 02:49:03 26 4
gpt4 key购买 nike

我正在尝试通过 Java SDK 运行 EMR 作业。

但它根本没有启动。

我正在粘贴我正在使用的代码。

我也看了documentation .但这并没有太大帮助。

     package com.zedo.aws.emr;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.elasticmapreduce.util.StepFactory;

public class ExampleEMR {

/**
* @param args
*/
public static void main(String[] args) {

AWSCredentials credentials = new BasicAWSCredentials("<my key>", "<my secret key>");
AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(credentials);

StepFactory stepFactory = new StepFactory();

StepConfig enableDebugging = new StepConfig()
.withName("Enable Debugging")
.withActionOnFailure("TERMINATE_JOB_FLOW")
.withHadoopJarStep(stepFactory.newEnableDebuggingStep());

StepConfig installHive = new StepConfig()
.withName("Install Hive")
.withActionOnFailure("TERMINATE_JOB_FLOW")
.withHadoopJarStep(stepFactory.newInstallHiveStep());

StepConfig hiveScript = new StepConfig().withName("Hive Script")
.withActionOnFailure("TERMINATE_JOB_FLOW")
.withHadoopJarStep(stepFactory.newRunHiveScriptStep("s3://<path to script>"));

RunJobFlowRequest request = new RunJobFlowRequest()
.withName("Hive Interactive")
.withSteps(enableDebugging, installHive)
.withLogUri("s3://myawsbucket/")
.withInstances(new JobFlowInstancesConfig()
.withEc2KeyName("<my key>")
.withHadoopVersion("0.20")
.withInstanceCount(5)
.withKeepJobFlowAliveWhenNoSteps(true)
.withMasterInstanceType("m1.small")
.withSlaveInstanceType("m1.small"));

RunJobFlowResult result = emr.runJobFlow(request);

}

}

或者有人可以给我一些示例链接吗?

最佳答案

这对我有用:

public void runScriptClientes(Calendar executionDate) {

// creacion credecencial s3
BasicAWSCredentials awsCreds = new BasicAWSCredentials(rb.getString("awsAccessKey"),
rb.getString("awsSecretKey"));

// creacion cliente para conectarse s3
AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(awsCreds);
emr.setRegion(Region.getRegion(Regions.EU_WEST_1));

// calculo de las carpeta a procesar
Map<String, FolderS3> s3DataToProcessInput = getRutasInput(executionDate);
//Map<String, Boolean> s3DataToProcessOut = getRutaInput();

for (Entry<String, FolderS3> bucket_ : s3DataToProcessInput.entrySet()){
String nameBucket = bucket_.getKey();
FolderS3 folderS3 = bucket_.getValue();
// verificar existencia bucket
if(folderS3.getExistInBucket()){
listaConcurrente.add(folderS3);
StepFactory stepFactory = new StepFactory();

StepConfig stepHive = new StepConfig()
.withName(rb.getString("nameStepClientesS3")+":"+nameBucket)/*nombre del step a ejecutar*/
.withActionOnFailure(ActionOnFailure.CONTINUE) /*accion a seguir si el step falla*/
.withHadoopJarStep(
stepFactory.newRunHiveScriptStep(rb.getString("scriptClienteS3"),
"-d", "s3DataToProcess=s3://"+rb.getString("bucketPropio")+"/"+rb.getString("ruta_input_c1")+folderS3.getNameKey(),
"-d", "s3DataToProcessOut=s3://"+rb.getString("bucketPropioOUT")+"/"+rb.getString("ruta_output_c1")+folderS3.getOutputFolder(),
"-d", "windowTime=tablaparametro"));

AddJobFlowStepsRequest jobFlow = new AddJobFlowStepsRequest().withJobFlowId(rb.getString("jobflowID"))
.withSteps(stepHive);

//mientras el estado sea pending o running
AddJobFlowStepsResult result = emr.addJobFlowSteps(jobFlow);
List<String> id = result.getStepIds();
DescribeStepRequest describe = new DescribeStepRequest().withStepId(id.get(0));
describe.setClusterId(rb.getString("jobflowID"));
describe.setRequestCredentials(awsCreds);
DescribeStepResult res = emr.describeStep(describe);
StepStatus status = res.getStep().getStatus();
String stas = status.getState();

while (stas.equals(StepExecutionState.PENDING.name()) || stas.equals(StepExecutionState.RUNNING.name())){
try {
Thread.sleep(5000);
res = emr.describeStep(describe);
status = res.getStep().getStatus();
stas = status.getState();
log.info(stas);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

if (stas.equals(StepExecutionState.COMPLETED.name())) {
folderS3.setProcessedInput(Boolean.TRUE);
listaConcurrente.remove(folderS3);
log.info("Step finalizado ok : "+folderS3 );
}else if(stas.equals(StepExecutionState.FAILED.name()) || stas.equals(StepExecutionState.CANCELLED.name())){
listaConcurrente.remove(folderS3);
folderS3.setProcessedInput(Boolean.FALSE);
listaConcurrente.add(folderS3);
log.info("Step Fallo o fue Cancelado : "+folderS3 );
}

// leer datos del resultado y cargar en BBDD

}
}
}

关于java - 从 Java SDK 运行 Amazon EMR 作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15202555/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com