gpt4 book ai didi

java - 线程 "main"org.apache.spark.SparkException : Task not serializable 中出现异常

转载 作者:行者123 更新时间:2023-12-01 10:06:49 26 4
gpt4 key购买 nike

你好

我正在使用Scala 2.11.8 和 Spark 1.6.1。每当我在 ma​​p 中调用函数时,它都会抛出以下异常:

"Exception in thread "main" org.apache.spark.SparkException: Task not serializable"

您可以在下面找到我的完整代码

package moviestream.recommender
import java.io
import java.io.Serializable

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
import org.jblas.DoubleMatrix


class FeatureExtraction{

val conf = new SparkConf().setMaster("local[2]").setAppName("Recommendation")
val sc = new SparkContext(conf)
val rawData = sc.textFile("data/u.data")
val rawRatings = rawData.map(_.split("\t").take(3))

//create rating object from rawratings
val ratings = rawRatings.map{case Array(user,movie,rating) => Rating(user.toInt,movie.toInt,rating.toDouble)}
//user Spark ALS library to train our model
// Build the recommendation model using ALS

val model = ALS.train(ratings,50,10,0.01)
//val model = ALS.trainImplicit(ratings,50,10,0.01,0.1) //last parameter is alpha
val predictedRating = model.predict(789,123)
//top ten recommended movies for user id 789, where k= number of recommended(10) 789=userid
val topKRecs = model.recommendProducts(789,10)
val movies = sc.textFile("data/u.item")
val titles = movies.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collectAsMap()
//how many movies this user has rated
val moviesForUser = ratings.keyBy(_.user).lookup(789)
//we will take the 10 movies with the highest ratings ction using the field of the object.
//moviesForUser.sortBy(-_.rating).take(10).map(rating=>(titles(rating.product),rating.rating)).foreach(println)
//let’s take a look at the top 10 recommendations for this user and see what the titles
//topKRecs.map(rating=>(titles(rating.product),rating.rating)).foreach(println)
// we will then need to create a DoubleMatrix object

val itemId = 567
val itemFactor = model.productFeatures.lookup(itemId).head
val itemVector = new DoubleMatrix(itemFactor)


//we are ready to apply our similarity metric to each item
/*val sims = model.productFeatures.map{ case (id, factor) =>
val factorVector = new DoubleMatrix(factor)
val sim = cosineSimilarity(factorVector, itemVector)
(id, sim)
}*/

//we can compute the top 10 most similar items by sorting out the similarity score for each item
//val sortedSims = sims.top(10)(Ordering.by[(Int,Double),Double]{case(id,similarity)=>similarity})

//we can sense check our item-to-item similarity
//val sortedSims2 = sims.top(11)(Ordering.by[(Int,Double),Double]{case(id,similarity)=>simintellij idea debugilarity})
//sortedSims2.slice(1,11).map{case (id,sim)=>(titles(id),sim)}.foreach(println)
//Finally,we can print the 10 items with the highest computed similarity metric to our given item:
//println("Result = "+titles(123))

def cosineSimilarity(vect1:DoubleMatrix,vect2:DoubleMatrix): Double = {
vect1.dot(vect2)/(vect1.norm1()*vect2.norm2())
}

val actualRating = moviesForUser.take(1)(0)
val predictedRatings = model.predict(789,actualRating.product)
//println(predictedRatings)
val squaredError = math.pow(predictedRatings - actualRating.rating,2.0)

val usersProducts = ratings.map{case Rating(user,product,rating) => (user,product)}
val predictions = model.predict(usersProducts).map{case Rating(user,product,rating)
=>((user,product),rating)}
val ratingsAndPredictions = ratings.map{case Rating(user,product,rating)=>((user,product),rating)}
.join(predictions)
val MSE = ratingsAndPredictions.map{case ((user,product),(actual,predicted))
=> math.pow((actual-predicted),2)}.reduce(_ + _)/ratingsAndPredictions.count()
//println("Mean Squared Error = " + MSE)
val RMSE = math.sqrt(MSE)
println("Root Mean Squared Error = "+ RMSE)
def avgPrecisionK(actual:Seq[Int],predicted:Seq[Int],k:Int):Double = {
val predk = predicted.take(k)
var score = 0.0
var numHits = 0.0
for((p,i)<- predk.zipWithIndex){
if(actual.contains(p)){
numHits += 1.0
score += numHits/(i.toDouble+1.0)
}
}
if(actual.isEmpty) {
1.0
}
else{
score/scala.math.min(actual.size,k).toDouble
}


}

val actualMovies = moviesForUser.map(_.product)
val predictedMovies = topKRecs.map(_.product)
//predictedMovies.foreach(println)
val apk10 = avgPrecisionK(actualMovies,predictedMovies,10)
//println(apk10)
//Locality Sensitive Hashing
val itemFactors = model.productFeatures.map{case (id,factor)=>factor}.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
//println(itemMatrix.rows,itemMatrix.columns)
val imBroadcast = sc.broadcast(itemMatrix)
//println(imBroadcast)
val allRecs = model.userFeatures.map{case (userId,array)=>
val userVector = new DoubleMatrix(array)
val scores = imBroadcast.value.mmul(userVector)
val sortedWithId = scores.data.zipWithIndex.sortBy(- _._1)
val recommendedIds = sortedWithId.map(_._2 +1).toSeq
(userId,recommendedIds)
}
println(allRecs)


}

最佳答案

正如上面评论中提到的,这个问题太宽泛了。但一个猜测可能会有所帮助。您在map内部使用广播值imBroadcast。我猜它包含在与 SparkContext 相同作用域中声明的函数,对吗?然后将它们移动到单独的对象。

关于java - 线程 "main"org.apache.spark.SparkException : Task not serializable 中出现异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36384714/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com