gpt4 book ai didi

scala - 如何使用 Spark 在 S3 中捆绑多个文件

转载 作者:可可西里 更新时间:2023-11-01 14:15:00 25 4
gpt4 key购买 nike

我在 S3 中有 2000 万个文件,跨越大约 8000 天。

文件按 UTC 时间戳组织,如下所示:s3://mybucket/path/txt/YYYY/MM/DD/filename.txt.gz。每个文件都是 UTF-8 文本,包含 0(空)到 100KB 的文本(第 95 个百分位数,尽管有一些文件高达数 MB)。

使用 Spark 和 Scala(我对两者都不熟悉,想学习),我想保存“每日包”(其中 8000 个),每个包包含当天找到的任意数量的文件。理想情况下,我想存储原始文件名及其内容。输出也应驻留在 S3 中并以某种适合在进一步的 Spark 步骤和实验中输入的格式进行压缩。

一个想法是将包存储为一堆 JSON 对象(每行一个,以 '\n' 分隔),例如

{id:"doc0001", meta:{x:"blah", y:"foo", ...}, content:"some long string here"}
{id:"doc0002", meta:{x:"foo", y:"bar", ...}, content: "another long string"}

或者,我可以尝试使用 Hadoop SequenceFile,但我还是不确定如何优雅地设置它。

例如使用Spark shell,我看到读取文件非常容易,例如:

val textFile = sc.textFile("s3n://mybucket/path/txt/1996/04/09/*.txt.gz")
// or even
val textFile = sc.textFile("s3n://mybucket/path/txt/*/*/*/*.txt.gz")
// which will take for ever

但是我如何“拦截”读者提供文件名呢?

或者我应该获取所有文件的 RDD,按天拆分,并在减少步骤中写出 K=filename, V=fileContent

最佳答案

你可以用这个

首先您可以获得一个缓冲区/S3 路径列表:

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
var files = new ArrayList[String]

//S3 Client and List Object Request
var s3Client = new AmazonS3Client();
var objectListing: ObjectListing = null;
var listObjectsRequest = new ListObjectsRequest();

//Your S3 Bucket
listObjectsRequest.setBucketName(s3_bucket)

//Your Folder path or Prefix
listObjectsRequest.setPrefix(base_prefix)

//Adding s3:// to the paths and adding to a list
do {
objectListing = s3Client.listObjects(listObjectsRequest);
for (objectSummary <- objectListing.getObjectSummaries().asScala) {
files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
}
listObjectsRequest.setMarker(objectListing.getNextMarker());
} while (objectListing.isTruncated());

//Removing Base Directory Name
files.remove(0)

//Creating a Scala List for same
files.asScala
}

现在把这个List对象传给下面的代码,注意:sc是SQLContext的一个对象

var df: DataFrame = null;
for (file <- files) {
val fileDf= sc.textFile(file)
if (df!= null) {
df= df.unionAll(fileDf)
} else {
df= fileDf
}
}

现在你得到了最终的统一 RDD,即 df

可选,您也可以将其重新分区到单个 BigRDD 中

val files = sc.textFile(filename, 1).repartition(1)

重新分区总是有效的:D

关于scala - 如何使用 Spark 在 S3 中捆绑多个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21819259/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com