gpt4 book ai didi

apache-spark - 值 toDF 不是 org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] 的成员

转载 作者:行者123 更新时间:2023-12-04 04:53:52 25 4
gpt4 key购买 nike

在 SPARK 2.0 中使用 SCALA 将预 LDA 转换转换为数据帧时出现编译错误。抛出错误的具体代码如下:

val documents = PreLDAmodel.transform(mp_listing_lda_df)
.select("docId","features")
.rdd
.map{ case Row(row_num: Long, features: MLVector) => (row_num, features) }
.toDF()

完整的编译错误是:
Error:(132, 8) value toDF is not a member of org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)]
possible cause: maybe a semicolon is missing before `value toDF'?
.toDF()

这是完整的代码:
import java.io.FileInputStream
import java.sql.{DriverManager, ResultSet}
import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.clustering.LDA
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel, RegexTokenizer, StopWordsRemover}
import org.apache.spark.ml.linalg.{Vector => MLVector}
import org.apache.spark.mllib.clustering.{LDA => oldLDA}
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object MPClassificationLDA {
/*Start: Configuration variable initialization*/
val props = new Properties
val fileStream = new FileInputStream("U:\\JIRA\\MP_Classification\\target\\classes\\mpclassification.properties")
props.load(fileStream)
val mpExtract = props.getProperty("mpExtract").toString
val shard6_db_server_name = props.getProperty("shard6_db_server_name").toString
val shard6_db_user_id = props.getProperty("shard6_db_user_id").toString
val shard6_db_user_pwd = props.getProperty("shard6_db_user_pwd").toString
val mp_output_file = props.getProperty("mp_output_file").toString
val spark_warehouse_path = props.getProperty("spark_warehouse_path").toString
val rf_model_file_path = props.getProperty("rf_model_file_path").toString
val windows_hadoop_home = props.getProperty("windows_hadoop_home").toString
val lda_vocabulary_size = props.getProperty("lda_vocabulary_size").toInt
val pre_lda_model_file_path = props.getProperty("pre_lda_model_file_path").toString
val lda_model_file_path = props.getProperty("lda_model_file_path").toString
fileStream.close()
/*End: Configuration variable initialization*/

val conf = new SparkConf().set("spark.sql.warehouse.dir", spark_warehouse_path)

def main(arg: Array[String]): Unit = {
//SQL Query definition and parameter values as parameter upon executing the Object
val cont_id = "14211599"
val top = "100000"
val start_date = "2016-05-01"
val end_date = "2016-06-01"

val mp_spark = SparkSession
.builder()
.master("local[*]")
.appName("MPClassificationLoadLDA")
.config(conf)
.getOrCreate()
MPClassificationLDACalculation(mp_spark, cont_id, top, start_date, end_date)
mp_spark.stop()
}

private def MPClassificationLDACalculation
(mp_spark: SparkSession
,cont_id: String
,top: String
,start_date: String
,end_date: String
): Unit = {

//DB connection definition
def createConnection() = {
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver").newInstance();
DriverManager.getConnection("jdbc:sqlserver://" + shard6_db_server_name + ";user=" + shard6_db_user_id + ";password=" + shard6_db_user_pwd);
}

//DB Field Names definition
def extractvalues(r: ResultSet) = {
Row(r.getString(1),r.getString(2))
}

//Prepare SQL Statement with parameter value replacement
val query = """SELECT docId = audt_id, text = auction_title FROM brands6.dbo.uf_ds_marketplace_classification_listing(@cont_id, @top, '@start_date', '@end_date') WHERE ? < ? OPTION(RECOMPILE);"""
.replaceAll("@cont_id", cont_id)
.replaceAll("@top", top)
.replaceAll("@start_date", start_date)
.replaceAll("@end_date", end_date)
.stripMargin

//Connect to Source DB and execute the Prepared SQL Steatement
val mpDataRDD = new JdbcRDD(mp_spark.sparkContext
,createConnection
,query
,lowerBound = 0
,upperBound = 10000000
,numPartitions = 1
,mapRow = extractvalues)

val schema_string = "docId,text"
val fields = StructType(schema_string.split(",")
.map(fieldname => StructField(fieldname, StringType, true)))

//Create Data Frame using format identified through schema_string
val mpDF = mp_spark.createDataFrame(mpDataRDD, fields)
mpDF.collect()

val mp_listing_tmp = mpDF.selectExpr("cast(docId as long) docId", "text")
mp_listing_tmp.printSchema()
println(mp_listing_tmp.first)

val mp_listing_lda_df = mp_listing_tmp.withColumn("docId", mp_listing_tmp("docId"))
mp_listing_lda_df.printSchema()

val tokenizer = new RegexTokenizer()
.setInputCol("text")
.setOutputCol("rawTokens")
.setMinTokenLength(2)

val stopWordsRemover = new StopWordsRemover()
.setInputCol("rawTokens")
.setOutputCol("tokens")

val vocabSize = 4000

val countVectorizer = new CountVectorizer()
.setVocabSize(vocabSize)
.setInputCol("tokens")
.setOutputCol("features")

val PreLDApipeline = new Pipeline()
.setStages(Array(tokenizer, stopWordsRemover, countVectorizer))

val PreLDAmodel = PreLDApipeline.fit(mp_listing_lda_df)
//comment out after saving it the first time
PreLDAmodel.write.overwrite().save(pre_lda_model_file_path)

val documents = PreLDAmodel.transform(mp_listing_lda_df)
.select("docId","features")
.rdd
.map{ case Row(row_num: Long, features: MLVector) => (row_num, features) }
.toDF()

//documents.printSchema()
val numTopics: Int = 20
val maxIterations: Int = 100

//note the FeaturesCol need to be set
val lda = new LDA()
.setOptimizer("em")
.setK(numTopics)
.setMaxIter(maxIterations)
.setFeaturesCol(("_2"))

val vocabArray = PreLDAmodel.stages(2).asInstanceOf[CountVectorizerModel].vocabulary
}
}

我认为这与代码导入部分的冲突有关。感谢任何帮助。

最佳答案

需要做的2件事:

导入隐式:请注意,这应该仅在 org.apache.spark.sql.SQLContext 的实例之后完成。被 build 。应该写成:

val sqlContext= new org.apache.spark.sql.SQLContext(sc)

import sqlContext.implicits._

将案例类移到方法之外:案例类,通过使用它来定义 DataFrame 的架构,应该在需要它的方法之外定义。您可以在此处阅读更多相关信息: https://issues.scala-lang.org/browse/SI-6649

关于apache-spark - 值 toDF 不是 org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] 的成员,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39839984/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com