gpt4 book ai didi

scala - 如何在 EMR 上使用 spark 有效地读取/解析 s3 文件夹中的 .gz 文件负载

转载 作者:可可西里 更新时间:2023-11-01 15:43:18 24 4
gpt4 key购买 nike

我正在尝试通过在 EMR 上执行的 spark 应用程序读取 s3 目录中的所有文件。

数据以典型格式存储,如“s3a://Some/path/yyyy/mm/dd/hh/blah.gz”

如果我使用深度嵌套的通配符(例如“s3a://SomeBucket/SomeFolder/////*.gz”),性能会很糟糕并且需要大约 40 分钟阅读几万个 gzip 压缩的小 json 文件。它可以工作,但是浪费 40 分钟来测试一些代码真的很糟糕。

我的研究告诉我还有另外两种方法性能更高。

使用 hadoop.fs 库 (2.8.5) 我尝试读取我提供的每个文件路径。

private def getEventDataHadoop(
eventsFilePaths: RDD[String]
)(implicit sqlContext: SQLContext): Try[RDD[String]] =
Try(
{
val conf = sqlContext.sparkContext.hadoopConfiguration

eventsFilePaths.map(eventsFilePath => {
val p = new Path(eventsFilePath)
val fs = p.getFileSystem(conf)
val eventData: FSDataInputStream = fs.open(p)
IOUtils.toString(eventData)
})
}
)

这些文件路径由以下代码生成:

private[disneystreaming] def generateInputBucketPaths(
s3Protocol: String,
bucketName: String,
service: String,
region: String,
yearsMonths: Map[String, Set[String]]
): Try[Set[String]] =
Try(
{
val days = 1 to 31
val hours = 0 to 23
val dateFormatter: Int => String = buildDateFormat("00")

yearsMonths.flatMap { yearMonth: (String, Set[String]) =>
for {
month: String <- yearMonth._2
day: Int <- days
hour: Int <- hours
} yield
s"$s3Protocol$bucketName/$service/$region/${dateFormatter(yearMonth._1.toInt)}/${dateFormatter(month.toInt)}/" +
s"${dateFormatter(day)}/${dateFormatter(hour)}/*.gz"
}.toSet
}
)

hadoop.fs 代码失败,因为 Path 类不可序列化。我想不出如何解决这个问题。

所以这导致我采用另一种使用 AmazonS3Client 的方法,在这种方法中,我只要求客户端提供文件夹(或前缀)中的所有文件路径,然后将文件解析为字符串,这可能会因为它们被压缩:

 private def getEventDataS3(bucketName: String, prefix: String)(
implicit sqlContext: SQLContext
): Try[RDD[String]] =
Try(
{
import com.amazonaws.services.s3._, model._
import scala.collection.JavaConverters._

val request = new ListObjectsRequest()
request.setBucketName(bucketName)
request.setPrefix(prefix)
request.setMaxKeys(Integer.MAX_VALUE)
val s3 = new AmazonS3Client(new ProfileCredentialsProvider("default"))

val objs: ObjectListing = s3.listObjects(request) // Note that this method returns truncated data if longer than the "pageLength" above. You might need to deal with that.
sqlContext.sparkContext
.parallelize(objs.getObjectSummaries.asScala.map(_.getKey).toList)
.flatMap { key =>
Source
.fromInputStream(s3.getObject(bucketName, key).getObjectContent: InputStream)
.getLines()
}
}
)

此代码产生空异常,因为配置文件不能为空(“java.lang.IllegalArgumentException:配置文件不能为空”)。请记住此代码在 AWS 内的 EMR 上运行,那么我如何提供它需要的凭证?其他人如何使用此客户端在 EMR 上运行 spark 作业?

非常感谢任何有助于使这些方法发挥作用的帮助。

最佳答案

路径在以后的 Hadoop 版本中是可序列化的,因为能够在 Spark RDD 中使用它很有用。在那之前,将路径转换为 ​​URI,编码它,并在你的闭包中从该 URI 创建一个新路径。

关于scala - 如何在 EMR 上使用 spark 有效地读取/解析 s3 文件夹中的 .gz 文件负载,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55692428/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com