gpt4 book ai didi

apache-spark - 如何让 AWS 上的本地 Spark 写入 S3

转载 作者:行者123 更新时间:2023-12-04 04:46:37 28 4
gpt4 key购买 nike

我已经在 AWS EC2 实例上安装了 Spark 2.4.3 和 Hadoop 3.2。我一直在本地模式下使用 spark(主要是 pyspark)并取得了巨大的成功。能够旋转一些小的东西,然后在我需要动力时调整它的大小,并且非常快速地完成所有操作,这真是太好了。当我真的需要扩展时,我可以切换到 EMR 并去吃午饭。除了一个问题之外,一切都很顺利:我无法让本地 Spark 可靠地写入 S3(我一直在使用本地 EBS 空间)。这显然与有关 S3 作为文件系统的限制的文档中概述的所有问题有关。但是,使用最新的 hadoop,我对文档的阅读应该能够让它工作。

请注意,我知道另一个帖子,它提出了一个相关问题;这里有一些指导,但没有我能看到的解决方案。 How to use new Hadoop parquet magic commiter to custom S3 server with Spark

根据我对此处文档的最佳理解,我有以下设置(在不同地方设置):https://hadoop.apache.org/docs/r3.2.1/hadoop-aws/tools/hadoop-aws/index.html

fs.s3.impl: org.apache.hadoop.fs.s3a.S3AFileSystem  
fs.s3a.committer.name: directory
fs.s3a.committer.magic.enabled: false
fs.s3a.committer.threads: 8
fs.s3a.committer.staging.tmp.path: /cache/staging
fs.s3a.committer.staging.unique-filenames: true
fs.s3a.committer.staging.conflict-mode: fail
fs.s3a.committer.staging.abort.pending.uploads: true
mapreduce.outputcommitter.factory.scheme.s3a: org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
fs.s3a.connection.maximum: 200
fs.s3a.fast.upload: true

一个相关的观点是我正在使用 Parquet 进行储蓄。我看到之前保存 Parquet 存在一些问题,但我在最新的文档中没有看到这一点。也许这就是问题所在?

无论如何,这是我得到的错误,这似乎表明 S3 在尝试重命名临时文件夹时出现的错误类型。是否有一些正确的设置可以使这种情况消失?
java.io.IOException: Failed to rename S3AFileStatus{path=s3://my-research-lab-recognise/spark-testing/v2/nz/raw/bank/_temporary/0/_temporary/attempt_20190910022011_0004_m_000118_248/part-00118-c8f8259f-a727-4e19-8ee2-d6962020c819-c000.snappy.parquet; isDirectory=false; length=185052; replication=1; blocksize=33554432; modification_time=1568082036000; access_time=0; owner=brett; group=brett; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false} isEmptyDirectory=FALSE to s3://my-research-lab-recognise/spark-testing/v2/nz/raw/bank/part-00118-c8f8259f-a727-4e19-8ee2-d6962020c819-c000.snappy.parquet
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:473)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:486)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:597)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:560)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:225)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:78)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
... 10 more

最佳答案

我帮助@brettc 进行了配置,我们找到了要设置的正确配置。

在 $SPARK_HOME/conf/spark-defaults.conf 下

# Enable S3 file system to be recognise
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem

# Parameters to use new commiters
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.hadoop.fs.s3a.committer.name directory
spark.hadoop.fs.s3a.committer.magic.enabled false
spark.hadoop.fs.s3a.commiter.staging.conflict-mode replace
spark.hadoop.fs.s3a.committer.staging.unique-filenames true
spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads true
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter

如果您查看上面的最后 2 行配置,您会发现您需要包含 PathOutputCommitProtocol 和 BindingParquetOutputCommitter 类的 org.apache.spark.internal.io 库。为此,您必须下载 spark-hadoop-cloud jar here(在我们的示例中,我们采用了 2.3.2.3.1.0.6-1 版本)并将其放在 $SPARK_HOME/jars/下。

您可以通过创建 parquet 文件轻松验证您正在使用新的提交者。 _SUCCESS 文件应包含如下所示的 json:
{
"name" : "org.apache.hadoop.fs.s3a.commit.files.SuccessData/1",
"timestamp" : 1574729145842,
"date" : "Tue Nov 26 00:45:45 UTC 2019",
"hostname" : "<hostname>",
"committer" : "directory",
"description" : "Task committer attempt_20191125234709_0000_m_000000_0",
"metrics" : { [...] },
"diagnostics" : { [...] },
"filenames" : [...]
}

关于apache-spark - 如何让 AWS 上的本地 Spark 写入 S3,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58495909/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com