gpt4 book ai didi

scala - 如何在 Apache Spark 中使用 DStream 进行特征提取

转载 作者:行者123 更新时间:2023-12-04 19:49:59 24 4
gpt4 key购买 nike

我有通过 DStream 从 Kafka 到达的数据。我想进行特征提取以获得一些关键词。

我不想等待所有数据的到达(因为它应该是可能永远不会结束的连续流),所以我希望以 block 的形式执行提取——准确性是否会受到影响对我来说无关紧要有一点。

到目前为止,我整理了类似的东西:

def extractKeywords(stream: DStream[Data]): Unit = {

val spark: SparkSession = SparkSession.builder.getOrCreate

val streamWithWords: DStream[(Data, Seq[String])] = stream map extractWordsFromData

val streamWithFeatures: DStream[(Data, Array[String])] = streamWithWords transform extractFeatures(spark) _

val streamWithKeywords: DStream[DataWithKeywords] = streamWithFeatures map addKeywordsToData

streamWithFeatures.print()
}

def extractFeatures(spark: SparkSession)
(rdd: RDD[(Data, Seq[String])]): RDD[(Data, Array[String])] = {

val df = spark.createDataFrame(rdd).toDF("data", "words")

val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(numOfFeatures)
val rawFeatures = hashingTF.transform(df)

val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(rawFeatures)

val rescaledData = idfModel.transform(rawFeature)

import spark.implicits._
rescaledData.select("data", "features").as[(Data, Array[String])].rdd
}

但是,我收到了 java.lang.IllegalStateException: Haven't seen any document yet. - 我并不感到惊讶,因为我只是想把东西拼凑起来,我明白这一点,因为我没有等待一些数据的到来,当我尝试在数据上使用生成的模型时,它可能是空的。

解决这个问题的正确方法是什么?

最佳答案

我使用了评论中的建议并将程序分成 2 次运行:

  • 计算 IDF 模型并将其保存到文件中

    def trainFeatures(idfModelFile: File, rdd: RDD[(String, Seq[String])]) = {
    val session: SparkSession = SparkSession.builder.getOrCreate

    val wordsDf = session.createDataFrame(rdd).toDF("data", "words")

    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures")
    val featurizedDf = hashingTF.transform(wordsDf)

    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
    val idfModel = idf.fit(featurizedDf)

    idfModel.write.save(idfModelFile.getAbsolutePath)
    }
  • 从文件中读取 IDF 模型并对所有传入信息简单地运行它

    val idfModel = IDFModel.load(idfModelFile.getAbsolutePath)

    val documentDf = spark.createDataFrame(rdd).toDF("update", "document")

    val tokenizer = new Tokenizer().setInputCol("document").setOutputCol("words")
    val wordsDf = tokenizer.transform(documentDf)

    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures")
    val featurizedDf = hashingTF.transform(wordsDf)

    val extractor = idfModel.setInputCol("rawFeatures").setOutputCol("features")
    val featuresDf = extractor.transform(featurizedDf)

    featuresDf.select("update", "features")

关于scala - 如何在 Apache Spark 中使用 DStream 进行特征提取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40996430/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com