gpt4 book ai didi

amazon-web-services - 如何从 Lambda 函数在亚马逊 EMR 上执行 Spark 提交?

转载 作者:太空宇宙 更新时间:2023-11-03 20:56:10 25 4
gpt4 key购买 nike

我想根据S3上的文件上传事件在AWS EMR集群上执行spark提交作业。我正在使用 AWS Lambda 函数捕获事件,但我不知道如何从 Lambda 函数在 EMR 集群上提交 Spark 提交作业。

我搜索的大多数答案都谈到在 EMR 集群中添加一个步骤。但我不知道是否可以添加任何步骤来在添加的步骤中触发“spark Submit --with args”。

最佳答案

你可以,上周我也做了同样的事情!

使用适用于 Python 的 boto3(其他语言肯定会有类似的解决方案),您可以使用定义的步骤启动集群,或者将步骤附加到已启动的集群。

使用步骤定义集群

def lambda_handler(event, context):
conn = boto3.client("emr")
cluster_id = conn.run_job_flow(
Name='ClusterName',
ServiceRole='EMR_DefaultRole',
JobFlowRole='EMR_EC2_DefaultRole',
VisibleToAllUsers=True,
LogUri='s3n://some-log-uri/elasticmapreduce/',
ReleaseLabel='emr-5.8.0',
Instances={
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm3.xlarge',
'InstanceCount': 1,
},
{
'Name': 'Slave nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm3.xlarge',
'InstanceCount': 2,
}
],
'Ec2KeyName': 'key-name',
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False
},
Applications=[{
'Name': 'Spark'
}],
Configurations=[{
"Classification":"spark-env",
"Properties":{},
"Configurations":[{
"Classification":"export",
"Properties":{
"PYSPARK_PYTHON":"python35",
"PYSPARK_DRIVER_PYTHON":"python35"
}
}]
}],
BootstrapActions=[{
'Name': 'Install',
'ScriptBootstrapAction': {
'Path': 's3://path/to/bootstrap.script'
}
}],
Steps=[{
'Name': 'StepName',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 's3n://elasticmapreduce/libs/script-runner/script-runner.jar',
'Args': [
"/usr/bin/spark-submit", "--deploy-mode", "cluster",
's3://path/to/code.file', '-i', 'input_arg',
'-o', 'output_arg'
]
}
}],
)
return "Started cluster {}".format(cluster_id)

将步骤附加到已运行的集群

根据 here

def lambda_handler(event, context):
conn = boto3.client("emr")
# chooses the first cluster which is Running or Waiting
# possibly can also choose by name or already have the cluster id
clusters = conn.list_clusters()
# choose the correct cluster
clusters = [c["Id"] for c in clusters["Clusters"]
if c["Status"]["State"] in ["RUNNING", "WAITING"]]
if not clusters:
sys.stderr.write("No valid clusters\n")
sys.stderr.exit()
# take the first relevant cluster
cluster_id = clusters[0]
# code location on your emr master node
CODE_DIR = "/home/hadoop/code/"

# spark configuration example
step_args = ["/usr/bin/spark-submit", "--spark-conf", "your-configuration",
CODE_DIR + "your_file.py", '--your-parameters', 'parameters']

step = {"Name": "what_you_do-" + time.strftime("%Y%m%d-%H:%M"),
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3n://elasticmapreduce/libs/script-runner/script-runner.jar',
'Args': step_args
}
}
action = conn.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step])
return "Added step: %s"%(action)

关于amazon-web-services - 如何从 Lambda 函数在亚马逊 EMR 上执行 Spark 提交?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56015402/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com