gpt4 book ai didi

scala - 如何以高性能的方式将1个RDD分成6个部分?

转载 作者:行者123 更新时间:2023-12-02 09:15:35 25 4
gpt4 key购买 nike

我构建了一个 Spark RDD,其中该 RDD 的每个元素都是表示 XML 记录的 JAXB 根​​元素。

我想拆分这个 RDD,以便从该集合中生成 6 个 RDD。本质上,此作业只是将分层 XML 结构转换为 6 组平面 CSV 记录。目前,我正在六次传递同一个 RDD 6 来执行此操作。

 xmlRdd.cache()
val rddofTypeA = xmlRdd.map {iterate over XML Object and create Type A}
rddOfTypeA.saveAsTextFile("s3://...")

val rddofTypeB = xmlRdd.map { iterate over XML Object and create Type B}
rddOfTypeB.saveAsTextFile("s3://...")

val rddofTypeC = xmlRdd.map { iterate over XML Object and create Type C}
rddOfTypeC.saveAsTextFile("s3://...")

val rddofTypeD = xmlRdd.map { iterate over XML Object and create Type D}
rddOfTypeD.saveAsTextFile("s3://...")

val rddofTypeE = xmlRdd.map { iterate over XML Object and create Type E}
rddOfTypeE.saveAsTextFile("s3://...")

val rddofTypeF = xmlRdd.map { iterate over XML Object and create Type F}
rddOfTypeF.saveAsTextFile("s3://...")

我的输入数据集是 3500 万条记录,分为 186 个文件,每个文件大小为 448MB,存储在 Amazon S3 中。我的输出目录也在 S3 上。我正在使用 EMR Spark。

对于六节点 m4.4xlarge 集群,完成拆分和写入输出需要 38 分钟。

是否有一种有效的方法可以在不遍历 RDD 六次的情况下实现此目的?

最佳答案

最简单的解决方案(从 Spark 开发人员的角度来看)是在单独的线程上为每个 RDD 执行 mapsaveAsTextFile

不为人所知(因此未被利用)的是,SparkContext 是线程安全的,因此可用于从单独的线程提交作业。

话虽如此,您可以执行以下操作(使用带有 Future 的最简单的 Scala 解决方案,但不一定是最好的,因为 Future 在实例化时开始计算,而不是在您这么说):

xmlRdd.cache()

import scala.concurrent.ExecutionContext.Implicits.global
val f1 = Future {
val rddofTypeA = xmlRdd.map { map xml to csv}
rddOfTypeA.saveAsTextFile("s3://...")
}

val f2 = Future {
val rddofTypeB = xmlRdd.map { map xml to csv}
rddOfTypeB.saveAsTextFile("s3://...")
}

...

Future.sequence(Seq(f1,f2)).onComplete { ... }

这可以减少映射和保存的时间,但不会减少数据集的扫描次数。无论如何,这应该不是什么大问题,因为数据集被缓存并因此存储在内存和/或磁盘中(Spark SQL 的 Dataset.cache 中的默认持久性级别为 MEMORY_AND_DISK)。

关于scala - 如何以高性能的方式将1个RDD分成6个部分?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47492010/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com