gpt4 book ai didi

python - 我如何在工作流模板 Spark 作业中传递参数

转载 作者:行者123 更新时间:2023-12-04 13:07:04 25 4
gpt4 key购买 nike

我的 spark dataproc 工作流有问题。

这在发布时有效:

gcloud dataproc jobs submit spark \
--project myproject \
--cluster=mycluster \
--region=europe-west3 \
--jars=gs:path\file.jar,gs://path//depende.jar \
--class=it.flow \
--properties spark.num.executors=2,spark.executor.cores=3,spark.executor.memory=5g,spark.driver.cores=2,spark.driver.memory=10g,spark.dynamicAllocation.enabled=false,spark.executor.userClassPathFirst=true,spark.driver.userClassPathFirst=true,spark.jars.packages=com.google.cloud:google-cloud-logging:2.2.0
-- 20210820 010000 000 0 000 TRY

我创建了一个 dataproc 工作流和 python 代码以通过 composer 启动它并且它有效。

现在我必须使最终参数动态化(-- 20210820 010000 000 0 000 TRY)

但是,我无法将参数传递给工作流:

gcloud dataproc workflow-templates create try1 --region=europe-west3

gcloud dataproc workflow-templates add-job spark \
--workflow-template=try1 \
--step-id=create_try1 \
--class=it.flow \
--region=europe-west3 \
--jars=gs:path\file.jar,gs://path//depende.jar \
--properties spark.num.executors=2,spark.executor.cores=3,spark.executor.memory=5g,spark.driver.cores=2,spark.driver.memory=10g,spark.dynamicAllocation.enabled=false,spark.executor.userClassPathFirst=true,spark.driver.userClassPathFirst=true,spark.jars.packages=com.google.cloud:google-cloud-logging:2.2.0 \
-- $arg1 $arg2

gcloud dataproc workflow-templates set-cluster-selector TRY1 --region=europe-west3 --cluster-labels=goog-dataproc-cluster-name=cluster

这个调用:

gcloud dataproc workflow-templates instantiate TRY1  --region=europe-west3 --parameters="arg1=20210820"

导致以下错误:

ERROR: (gcloud.dataproc.workflow-templates.instantiate) INVALID_ARGUMENT: Template does not contain a parameter with namearg1.

我该如何解决这个问题?

yaml文件

id: create_file
jobs:
- sparkJob:
args:
- ARG1
- ARG2
jarFileUris:
- gs://mybucket/try_file.jar
- gs://mybucket/try_dependencies_2.jar
mainClass: org.apache.hadoop.examples.tryFile
properties:
spark.driver.cores: '2'
spark.driver.memory: 10g
spark.driver.userClassPathFirst: 'true'
spark.dynamicAllocation.enabled: 'false'
spark.executor.cores: '3'
spark.executor.memory: 5g
spark.executor.userClassPathFirst: 'true'
spark.jars.packages: com.google.cloud:google-cloud-logging:2.2.0
spark.num.executors: '2'
stepId: create_file_try
parameters:
- name: ARG1
fields:
- jobs['create_file_try'].sparkJob.args[0]
- name: ARG2
fields:
- jobs['create_file_try'].sparkJob.args[1]
name: projects/My-project-id/regions/europe-west3/workflowTemplates/create_file
updateTime: '2021-08-25T07:49:59.251096Z'

最佳答案

要让您的工作流模板接受参数,最好使用 yaml 文件。您可以在运行完整命令 gcloud dataproc workflow-templates add-job spark 时获取 yaml 文件。它将在 CLI 上返回一个 yaml 配置。

在这个例子中我只使用了sample code from the Dataproc documentation并在 --properties 中使用您的值进行测试。

注意:我在这个例子的 yaml 文件中使用了一个虚拟的 project-id。确保使用实际的 project-id,这样就不会遇到任何问题。

示例命令:

gcloud dataproc workflow-templates add-job spark \
--workflow-template=try1 \
--step-id=create_try1 \
--class=org.apache.hadoop.examples.WordCount \
--region=europe-west3 \
--jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
--properties spark.num.executors=2,spark.executor.cores=3,spark.executor.memory=5g,spark.driver.cores=2,spark.driver.memory=10g,spark.dynamicAllocation.enabled=false,spark.executor.userClassPathFirst=true,spark.driver.userClassPathFirst=true,spark.jars.packages=com.google.cloud:google-cloud-logging:2.2.0 \
-- ARG1 ARG2

CLI 输出(yaml 配置):

id: try1
jobs:
- sparkJob:
args:
- ARG1
- ARG2
jarFileUris:
- file:///usr/lib/spark/examples/jars/spark-examples.jar
mainClass: org.apache.hadoop.examples.WordCount
properties:
spark.driver.cores: '2'
spark.driver.memory: 10g
spark.driver.userClassPathFirst: 'true'
spark.dynamicAllocation.enabled: 'false'
spark.executor.cores: '3'
spark.executor.memory: 5g
spark.executor.userClassPathFirst: 'true'
spark.jars.packages: com.google.cloud:google-cloud-logging:2.2.0
spark.num.executors: '2'
stepId: create_try1
name: projects/your-project-id/regions/europe-west3/workflowTemplates/try1
placement:
managedCluster:
clusterName: mycluster
updateTime: '2021-08-25T03:30:47.365244Z'
version: 3

复制生成的 yaml 配置,打开文本编辑器并添加 parameters: 字段。它将包含您要接受的论点。

parameters:
- name: ARG1
fields:
- jobs['create_try1'].sparkJob.args[0] # use the stepId in jobs[], in this example it is 'create_try1'
- name: ARG2
fields:
- jobs['create_try1'].sparkJob.args[1]

在这个例子中,我把它放在 stepId: 之后。

编辑的 yaml 配置:

id: try1
jobs:
- sparkJob:
args:
- ARG1
- ARG2
jarFileUris:
- file:///usr/lib/spark/examples/jars/spark-examples.jar
mainClass: org.apache.hadoop.examples.WordCount
properties:
spark.driver.cores: '2'
spark.driver.memory: 10g
spark.driver.userClassPathFirst: 'true'
spark.dynamicAllocation.enabled: 'false'
spark.executor.cores: '3'
spark.executor.memory: 5g
spark.executor.userClassPathFirst: 'true'
spark.jars.packages: com.google.cloud:google-cloud-logging:2.2.0
spark.num.executors: '2'
stepId: create_try1
parameters:
- name: ARG1
fields:
- jobs['create_try1'].sparkJob.args[0]
- name: ARG2
fields:
- jobs['create_try1'].sparkJob.args[1]
name: projects/your-project-id/regions/europe-west3/workflowTemplates/try1
placement:
managedCluster:
clusterName: mycluster
updateTime: '2021-08-25T03:13:25.014685Z'
version: 3

使用编辑后的 ​​yaml 文件覆盖您的工作流模板:

gcloud dataproc workflow-templates import try1 \
--region=europe-west3 \
--source=config.yaml

使用 gcloud dataproc workflow-templates instantiate 运行模板:

enter image description here

更多详情可以引用Parameterization of Workflow Templates .

关于python - 我如何在工作流模板 Spark 作业中传递参数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68911200/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com