gpt4 book ai didi

apache-spark - 如何将转换后的数据从分区发送到 S3?

转载 作者:行者123 更新时间:2023-12-04 04:21:14 24 4
gpt4 key购买 nike

我有一个RDD,它太大了collect。我对 RDD 应用了一系列转换,并希望将其转换后的数据直接从我的从节点上的分区发送到 S3。我目前操作如下:

val rdd:RDD = initializeRDD
val rdd2 = rdd.transform
rdd2.first // in order to force calculation of RDD
rdd2.foreachPartition sendDataToS3

不幸的是,发送到 S3 的数据是未转换的。 RDD 看起来和 initializeRDD 阶段完全一样。

这里是 sendDataToS3 的主体:

implicit class WriteableRDD[T](rdd:RDD[T]){

def transform:RDD[String] = rdd map {_.toString}

....
def sendPartitionsToS3(prefix:String) = {
rdd.foreachPartition { p =>
val filename = prefix+new scala.util.Random().nextInt(1000000)
val pw = new PrintWriter(new File(filename))
p foreach pw.println
pw.close
s3.putObject(S3_BUCKET, filename, new File(filename))
}
this
}

}

这是用 rdd.transform.sendPartitionsToS3(prefix) 调用的。

如何确保在 sendDataToS3 中发送的数据是转换后的数据?

最佳答案

我猜你的代码中有一个问题没有包含在问题中。

无论如何我都会回答,只是为了确保您了解 RDD.saveAsTextFile。您可以为其提供 S3 上的路径 (s3n://bucket/directory),它会直接从执行程序将每个分区写入该路径。

我很难想象您什么时候需要实现自己的 sendPartitionsToS3 而不是使用 saveAsTextFile

关于apache-spark - 如何将转换后的数据从分区发送到 S3?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33704073/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com