gpt4 book ai didi

java - 使用带迭代器的 mapPartition 保存 spark RDD

转载 作者:可可西里 更新时间:2023-11-01 14:25:32 32 4
gpt4 key购买 nike

我有一些中间数据需要存储在 HDFS 和本地。我正在使用 Spark 1.6。在作为中间形式的 HDFS 中,我在 /output/testDummy/part-00000/output/testDummy/part-00001 中获取数据。我想使用 Java/Scala 将这些分区保存在本地,这样我就可以将它们保存为 /users/home/indexes/index.nt(通过在本地合并)或 /users/home/indexes/index-0000.nt/home/indexes/index-0001.nt 分开。

这是我的代码:注意:testDummy 与 test 相同,输出有两个分区。我想将它们单独存储或组合存储,但在本地使用 index.nt 文件。我更喜欢分别存储在两个数据节点中。我正在使用集群并在 YARN 上提交 spark 作业。我还添加了一些评论、次数以及我得到的数据。我怎么办?感谢您的帮助。

 val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy")
println("testDummy done") //1 time print

def savesData(iterator: Iterator[(String)]): Iterator[(String)] = {
println("Inside savesData") // now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2
println("iter size"+iterator.size) // 2 735 2 735 values
val filenamesWithExtension = outputPath + "/index.nt"
println("filenamesWithExtension "+filenamesWithExtension.length) //4 times
var list = List[(String)]()

val fileWritter = new FileWriter(filenamesWithExtension,true)
val bufferWritter = new BufferedWriter(fileWritter)

while (iterator.hasNext){ //iterator.hasNext is false
println("inside iterator") //0 times
val dat = iterator.next()
println("datadata "+iterator.next())

bufferWritter.write(dat + "\n")
bufferWritter.flush()
println("index files written")

val dataElements = dat.split(" ")
println("dataElements") //0
list = list.::(dataElements(0))
list = list.::(dataElements(1))
list = list.::(dataElements(2))
}
bufferWritter.close() //closing
println("savesData method end") //4 times when coal=2
list.iterator
}

println("before saving data into local") //1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println("testRDD partitions "+test.getNumPartitions) //2
println("testRDD size "+test.collect().length) //0
println("after saving data into local") //1

PS:我关注了,thisthis但与我正在寻找的不完全相同,我以某种方式做了但没有在 index.nt

中得到任何东西

最佳答案

一些事情:

  • 如果您打算稍后使用数据,请不要调用 Iterator.size迭代器TraversableOnce .计算 Iterator 大小的唯一方法是遍历它的所有元素,然后就没有更多的数据要读取了。
  • 不要使用像 mapPartitions 这样的转换来产生副作用。如果您想执行某种类型的 IO,请使用 foreach/foreachPartition 等操作。这是一种不好的做法,并不能保证给定的代码片段只会执行一次。
  • Action 或转换中的本地路径是特定工作人员的本地路径。如果您想直接在客户端机器上写入,您应该先使用 collecttoLocalIterator 获取数据。写入分布式存储并稍后获取数据可能会更好。

关于java - 使用带迭代器的 mapPartition 保存 spark RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38044231/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com