gpt4 book ai didi

scala - 如何将 mapPartitions 的 Iterator[String] 结果写入一个文件?

转载 作者:行者123 更新时间:2023-12-04 06:09:54 29 4
gpt4 key购买 nike

我是 Spark Scala 的新手,这就是为什么我很难通过它。

我打算做的是使用 Spark 使用 Stanford CoreNLP 预处理我的数据。我知道我必须使用 mapPartitions 以便每个分区有一个 StanfordCoreNLP 实例,如 this thread 中所建议的那样.但是,我缺乏知识/理解如何从这里开始。

最后,我想根据这些数据训练词向量,但现在我很乐意了解如何从这里获取处理后的数据并将其写入另一个文件。

这是我到目前为止得到的:

import java.util.Properties

import com.google.gson.Gson
import edu.stanford.nlp.ling.CoreAnnotations.{LemmaAnnotation, SentencesAnnotation, TokensAnnotation}
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP}
import edu.stanford.nlp.util.CoreMap
import masterthesis.code.wordvectors.Review
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.JavaConversions._

object ReviewPreprocessing {

def main(args: Array[String]) {

val resourceUrl = getClass.getResource("amazon-reviews/reviews_Electronics.json")
val file = sc.textFile(resourceUrl.getPath)

val linesPerPartition = file.mapPartitions( lineIterator => {

val props = new Properties()
props.put("annotators", "tokenize, ssplit, pos, lemma")

val sentencesAsTextList : List[String] = List()
val pipeline = new StanfordCoreNLP(props)
val gson = new Gson()

while(lineIterator.hasNext) {

val line = lineIterator.next
val review = gson.fromJson(line, classOf[Review])
val doc = new Annotation(review.getReviewText)

pipeline.annotate(doc)

val sentences : java.util.List[CoreMap] = doc.get(classOf[SentencesAnnotation])
val sb = new StringBuilder();

sentences.foreach( sentence => {
val tokens = sentence.get(classOf[TokensAnnotation])
tokens.foreach( token => {
sb.append(token.get(classOf[LemmaAnnotation]))
sb.append(" ")
})
})
sb.setLength(sb.length - 1)
sentencesAsTextList.add(sb.toString)
}

sentencesAsTextList.iterator
})

System.exit(0)
}

}

我会怎样,例如将此结果写入一个文件?此处的顺序无关紧要 - 我想此时无论如何顺序都丢失了。

最佳答案

如果您在 RDD 上使用 saveAsTextFile,您最终会拥有与您拥有的分区一样多的输出文件。为了只有一个,您可以将所有内容合并到一个分区中,例如

sc.textFile("/path/to/file")
.mapPartitions(someFunc())
.coalesce(1)
.saveAsTextFile("/path/to/another/file")

或者(只是为了好玩)您可以将所有分区一个一个地驱动并自己保存所有数据。

val it = sc.textFile("/path/to/file")
.mapPartitions(someFunc())
.toLocalIterator

while(it.hasNext) {
writeToFile(it.next())
}

关于scala - 如何将 mapPartitions 的 Iterator[String] 结果写入一个文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36376626/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com