gpt4 book ai didi

apache-spark - Spark RDD foreachPartition 到 S3

转载 作者:行者123 更新时间:2023-12-01 11:24:33 24 4
gpt4 key购买 nike

我目前正在探索 Spark。我面临以下任务 - 获取一个 RDD,根据特定条件对其进行分区,然后将多个文件写入 S3 存储桶中的不同文件夹。

在我们来到上传到 S3 部分之前,一切都很好。我已经在 SO 上阅读了与此问题相关的所有问题,发现我可以对 RDD 使用 AmazonS3ClientsaveToTextFile 方法。我面临两个问题:

  1. 如果我使用 AmazonS3Client,我会得到一个 java.io.NotSerializableException,因为代码是从 Spark 驱动程序发送到它需要的 worker序列化,显然 AmazonS3Client 不支持它。

  2. 如果我使用 saveToTextFile,我会遇到类似的问题。当我进入 foreachPartition 循环时,我需要获取 Iterable[T](在本例中为 p),所以如果我想使用 saveToTextFile 我需要创建 Iterable 的 RDD,因此需要 parallelize。问题是 SparkContext sc 也(理所当然地)没有序列化。

rdd.foreachPartition { p =>
sc.parallelize(p.toSeq).saveAsTextFile(s"s3n://")
}

任何帮助将不胜感激。

最佳答案

没有必要这样做。您可以将 saveAsTextFile 与 rdd 一起使用:

rdd.saveAsTextFile(s"s3n://dir/to/aux/file")

saveAsTextFile 将写入包含文件许多部分(与分区一样多的部分)的文件夹中的 S3。然后,如果需要,您可以合并为一个文件:

  def mergeToS3(srcPath: String, dstPath: String, sc: SparkContext): Unit = {
val hadoopConfig = sc.hadoopConfiguration
val fs = FileSystem.get(new URI(srcPath), hadoopConfig)
FileUtil.copyMerge(fs, new Path(srcPath), fs, new Path(dstPath), true, hadoopConfig, null)
}

mergeToS3("s3n://dir/to/aux/file", "s3n://dir/to/singleFile",sc)

关于apache-spark - Spark RDD foreachPartition 到 S3,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38746045/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com