- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我有一个在 Yarn 集群上运行的 spark 应用程序,它需要从 S3 兼容对象存储上的多个存储桶中读取文件,每个存储桶都有自己的一组凭据。
根据 hadoop documentation应该可以通过设置 spark.hadoop.fs.s3a.bucket.<bucket-name>.access.key=<access-key>
形式的配置来为多个存储桶指定凭证在事件SparkSession
但这在实践中对我不起作用。
根据文档,我认为应该可行的示例:
import org.apache.spark.sql.{SaveMode, SparkSession}
case class BucketCredential(bucketName: String, accessKey: String, secretKey: String)
object TestMultiBucketReadWrite {
val credentials: Seq[BucketCredential] = Seq(
BucketCredential("bucket.1", "access.key.1", "secret.key.1"),
BucketCredential("bucket.2", "access.key.2", "secret.key.2")
)
def addCredentials(sparkBuilder: SparkSession.Builder, credentials: Seq[BucketCredential]): SparkSession.Builder = {
var sBuilder = sparkBuilder
for (credential <- credentials) {
sBuilder = sBuilder
.config(s"spark.hadoop.fs.s3a.bucket.${credential.bucketName}.access.key", credential.accessKey)
.config(s"spark.hadoop.fs.s3a.bucket.${credential.bucketName}.secret.key", credential.secretKey)
}
sBuilder
}
def main(args: Array[String]): Unit = {
val spark = addCredentials(SparkSession.builder(), credentials)
.appName("Test MultiBucket Credentials")
.getOrCreate()
import spark.implicits._
val dummyDF = Seq(1,2,3,4,5).toDS()
println("Testing multi write...")
credentials.foreach(credential => {
val bucket = credential.bucketName
dummyDF.write.mode(SaveMode.Overwrite).json(s"s3a://$bucket/test.json")
})
println("Testing multi read...")
credentials.foreach(credential => {
val bucket = credential.bucketName
val df = spark.read.json(s"s3a://$bucket/test.json").as[Long]
println(df.collect())
})
}
}
但是,提交作业失败并出现以下错误:
Testing multi write...
Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: null, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: null
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:93)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at org.apache.spark.sql.DataFrameWriter.json(DataFrameWriter.scala:545)
当我改为设置 fs.s3a.access.key
时,作业确实成功了和 fs.s3a.secret.key
按顺序设置,但涉及顺序读/写:
//...
println("Testing multi write...")
credentials.foreach(credential => {
val bucket = credential.bucketName
spark.conf.set("fs.s3a.access.key", credential.accessKey)
spark.conf.set("fs.s3a.secret.key", credential.secretKey)
dummyDF.write.mode(SaveMode.Overwrite).json(s"s3a://$bucket/test.json")
})
println("Testing multi read...")
credentials.foreach(credential => {
val bucket = credential.bucketName
spark.conf.set("fs.s3a.access.key", credential.accessKey)
spark.conf.set("fs.s3a.secret.key", credential.secretKey)
val df = spark.read.json(s"s3a://$bucket/test.json").as[Long]
println(df.collect())
})
//...
最佳答案
Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: null, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: null
403 Forbidden 表示理解请求但无法服务....
s3 帐户没有您的多个存储桶之一的访问权限。请再次检查...
原因之一可能是代理问题...
AWS 使用 http 代理连接到 aws 集群。我希望这些代理设置是正确的在您的 shell 脚本中定义这些示例变量,
SPARK_DRIVER_JAVA_OPTS="
-Dhttp.proxyHost=${PROXY_HOST}
-Dhttp.proxyPort=${PROXY_PORT}
-Dhttps.proxyHost=${PROXY_HOST}
-Dhttps.proxyPort=${PROXY_PORT}
$SPARK_DRIVER_JAVA_OPTS"
SPARK_EXECUTOR_JAVA_OPTS="
-Dhttp.proxyHost=${PROXY_HOST}
-Dhttp.proxyPort=${PROXY_PORT}
-Dhttps.proxyHost=${PROXY_HOST}
-Dhttps.proxyPort=${PROXY_PORT}
$SPARK_EXECUTOR_JAVA_OPTS"
SPARK_OPTS="
--conf spark.hadoop.fs.s3a.proxy.host=${PROXY_HOST}
--conf spark.hadoop.fs.s3a.proxy.port=${PROXY_PORT}
$SPARK_OPTS"
spark 提交看起来像...
spark-submit \
--executor-cores $SPARK_EXECUTOR_CORES \
--executor-memory $SPARK_EXECUTOR_MEMORY \
--driver-memory $SPARK_DRIVER_MEMORY \
--driver-java-options "$SPARK_DRIVER_JAVA_OPTS" \
--conf spark.executor.extraJavaOptions="$SPARK_EXECUTOR_JAVA_OPTS" \
--master $SPARK_MASTER \
--class $APP_MAIN \
$SPARK_OPTS \
$APP_JAR "$@"
注意:据我所知,如果您有 AWS EMR 的 s3 访问权限,则无需每次都设置访问 key ,因为它是隐式的
关于scala - 从 Spark 中的多个 S3 存储桶中读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57700345/
目前正在学习 Spark 的类(class)并了解到执行者的定义: Each executor will hold a chunk of the data to be processed. Thisc
阅读了有关 http://spark.apache.org/docs/0.8.0/cluster-overview.html 的一些文档后,我有一些问题想要澄清。 以 Spark 为例: JavaSp
Spark核心中的调度器与以下Spark Stack(来自Learning Spark:Lightning-Fast Big Data Analysis一书)中的Standalone Schedule
我想在 spark-submit 或 start 处设置 spark.eventLog.enabled 和 spark.eventLog.dir -all level -- 不要求在 scala/ja
我有来自 SQL Server 的数据,需要在 Apache Spark (Databricks) 中进行操作。 在 SQL Server 中,此表的三个键列使用区分大小写的 COLLATION 选项
所有这些有什么区别和用途? spark.local.ip spark.driver.host spark.driver.bind地址 spark.driver.hostname 如何将机器修复为 Sp
我有大约 10 个 Spark 作业,每个作业都会进行一些转换并将数据加载到数据库中。必须为每个作业单独打开和关闭 Spark session ,每次初始化都会耗费时间。 是否可以只创建一次 Spar
/Downloads/spark-3.0.1-bin-hadoop2.7/bin$ ./spark-shell 20/09/23 10:58:45 WARN Utils: Your hostname,
我是 Spark 的完全新手,并且刚刚开始对此进行更多探索。我选择了更长的路径,不使用任何 CDH 发行版安装 hadoop,并且我从 Apache 网站安装了 Hadoop 并自己设置配置文件以了解
TL; 博士 Spark UI 显示的内核和内存数量与我在使用 spark-submit 时要求的数量不同 更多细节: 我在独立模式下运行 Spark 1.6。 当我运行 spark-submit 时
spark-submit 上的文档说明如下: The spark-submit script in Spark’s bin directory is used to launch applicatio
关闭。这个问题是opinion-based .它目前不接受答案。 想改善这个问题吗?更新问题,以便可以通过 editing this post 用事实和引文回答问题. 6 个月前关闭。 Improve
我想了解接收器如何在 Spark Streaming 中工作。根据我的理解,将有一个接收器任务在执行器中运行,用于收集数据并保存为 RDD。当调用 start() 时,接收器开始读取。需要澄清以下内容
有没有办法在不同线程中使用相同的 spark 上下文并行运行多个 spark 作业? 我尝试使用 Vertx 3,但看起来每个作业都在排队并按顺序启动。 如何让它在相同的 spark 上下文中同时运行
我们有一个 Spark 流应用程序,这是一项长期运行的任务。事件日志指向 hdfs 位置 hdfs://spark-history,当我们开始流式传输应用程序时正在其中创建 application_X
我们正在尝试找到一种加载 Spark (2.x) ML 训练模型的方法,以便根据请求(通过 REST 接口(interface))我们可以查询它并获得预测,例如http://predictor.com
Spark newb 问题:我在 spark-sql 中进行完全相同的 Spark SQL 查询并在 spark-shell . spark-shell版本大约需要 10 秒,而 spark-sql版
我正在使用 Spark 流。根据 Spark 编程指南(参见 http://spark.apache.org/docs/latest/programming-guide.html#accumulato
我正在使用 CDH 5.2。我可以使用 spark-shell 运行命令。 如何运行包含spark命令的文件(file.spark)。 有没有办法在不使用 sbt 的情况下在 CDH 5.2 中运行/
我使用 Elasticsearch 已经有一段时间了,但使用 Cassandra 的经验很少。 现在,我有一个项目想要使用 Spark 来处理数据,但我需要决定是否应该使用 Cassandra 还是
我是一名优秀的程序员,十分优秀!