- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
在尝试使用 Databricks 提供的动手实验室执行 spark mllib ALS 时,是否有人知道此错误的可能原因?
14/11/20 23:33:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
14/11/20 23:33:39 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
Got 27980 ratings from 24071 users on 4211 movies.
Training: 27989, validation: 0, test: 0
Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:806)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:806)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:806)
at MovieLensALS$.computeRmse(MovieLensALS.scala:149)
at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$mcVD$sp$1.apply$mcVI$sp(MovieLensALS.scala:95)
at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$mcVD$sp$1.apply(MovieLensALS.scala:93)
at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$mcVD$sp$1.apply(MovieLensALS.scala:93)
at scala.collection.immutable.List.foreach(List.scala:318)
at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1.apply$mcVD$sp(MovieLensALS.scala:93)
at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1.apply(MovieLensALS.scala:93)
at MovieLensALS$$anonfun$main$1$$anonfun$apply$mcVI$sp$1.apply(MovieLensALS.scala:93)
at scala.collection.immutable.List.foreach(List.scala:318)
at MovieLensALS$$anonfun$main$1.apply$mcVI$sp(MovieLensALS.scala:93)
at MovieLensALS$$anonfun$main$1.apply(MovieLensALS.scala:93)
at MovieLensALS$$anonfun$main$1.apply(MovieLensALS.scala:93)
at scala.collection.immutable.List.foreach(List.scala:318)
at MovieLensALS$.main(MovieLensALS.scala:93)
at MovieLensALS.main(MovieLensALS.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
更新:没问题!我正在使用这个类。在 https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html 中可用和 https://databricks-training.s3.amazonaws.com/getting-started.html#additional-required-download .让我知道是否还有其他可以帮助的东西
import java.io.File
import scala.io.Source
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._`enter code here`
import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}
object MovieLensALS {
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
if (args.length != 2) {
println("Usage: /path/to/spark/bin/spark-submit --driver-memory 2g --class MovieLensALS " +
"target/scala-*/movielens-als-ssembly-*.jar movieLensHomeDir personalRatingsFile")
sys.exit(1)
}
// set up environment
val conf = new SparkConf()
.setAppName("MovieLensALS")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
// load personal ratings
val myRatings = loadRatings(args(1))
val myRatingsRDD = sc.parallelize(myRatings, 1)
// load ratings and movie titles
val movieLensHomeDir = args(0)
val ratings = sc.textFile(new File(movieLensHomeDir, "ratings.dat").toString).map { line =>
val fields = line.split("::")
// format: (timestamp % 10, Rating(userId, movieId, rating))
(fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))
}
val movies = sc.textFile(new File(movieLensHomeDir, "movies.dat").toString).map { line =>
val fields = line.split("::")
// format: (movieId, movieName)
(fields(0).toInt, fields(1))
}.collect().toMap
val numRatings = ratings.count()
val numUsers = ratings.map(_._2.user).distinct().count()
val numMovies = ratings.map(_._2.product).distinct().count()
println("Got " + numRatings + " ratings from "
+ numUsers + " users on " + numMovies + " movies.")
// split ratings into train (60%), validation (20%), and test (20%) based on the
// last digit of the timestamp, add myRatings to train, and cache them
val numPartitions = 4
val training = ratings.filter(x => x._1 < 6)
.values
.union(myRatingsRDD)
.repartition(numPartitions)
.cache()
val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8)
.values
.repartition(numPartitions)
.cache()
val test = ratings.filter(x => x._1 >= 8).values.cache()
val numTraining = training.count()
val numValidation = validation.count()
val numTest = test.count()
println("Training: " + numTraining + ", validation: " + numValidation + ", test: " + numTest)
// train models and evaluate them on the validation set
val ranks = List(8, 12)
val lambdas = List(0.1, 10.0)
val numIters = List(10, 20)
var bestModel: Option[MatrixFactorizationModel] = None
var bestValidationRmse = Double.MaxValue
var bestRank = 0
var bestLambda = -1.0
var bestNumIter = -1
for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
val model = ALS.train(training, rank, numIter, lambda)
val validationRmse = computeRmse(model, validation, numValidation)
println("RMSE (validation) = " + validationRmse + " for the model trained with rank = "
+ rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".")
if (validationRmse < bestValidationRmse) {
bestModel = Some(model)
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lambda
bestNumIter = numIter
}
}
// evaluate the best model on the test set
val testRmse = computeRmse(bestModel.get, test, numTest)
println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda
+ ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".")
// create a naive baseline and compare it with the best model
val meanRating = training.union(validation).map(_.rating).mean
val baselineRmse =
math.sqrt(test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).mean)
val improvement = (baselineRmse - testRmse) / baselineRmse * 100
println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.")
// make personalized recommendations
val myRatedMovieIds = myRatings.map(_.product).toSet
val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)
val recommendations = bestModel.get
.predict(candidates.map((0, _)))
.collect()
.sortBy(- _.rating)
.take(50)
var i = 1
println("Movies recommended for you:")
recommendations.foreach { r =>
println("%2d".format(i) + ": " + movies(r.product))
i += 1
}
// clean up
sc.stop()
}
/** Compute RMSE (Root Mean Squared Error). */
def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = {
val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating))
.join(data.map(x => ((x.user, x.product), x.rating)))
.values
math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n)
}
/** Load ratings from file. */
def loadRatings(path: String): Seq[Rating] = {
val lines = Source.fromFile(path).getLines()
val ratings = lines.map { line =>
val fields = line.split("::")
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
}.filter(_.rating > 0.0)
if (ratings.isEmpty) {
sys.error("No ratings provided.")
} else {
ratings.toSeq
}
}
}
最佳答案
可能是因为您(或 computeRmse 方法)正在使用某些过滤器,所以 reduce 方法在空集合/RDD 上被调用,因此抛出“空集合”。尝试仔细检查过滤器或 computeRmse() 函数。
关于java - 星火 UnsupportedOperationException : empty collection,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27053036/
我已经在 Window 中安装了 Spark。我正在尝试从 D: 驱动器加载文本文件。 RDD 正在正常创建,但是当我对该接收错误执行任何操作时。我尝试了斜线的所有组合但没有成功 scala> val
我正在尝试使用 Spark 的 MLlib 在 Java 上实现 KMeans,我偶然发现了一个问题,那就是,尽管我导入了正确的 jar,但我的编译器无法识别这一行 // Cluster the da
在尝试使用 Databricks 提供的动手实验室执行 spark mllib ALS 时,是否有人知道此错误的可能原因? 14/11/20 23:33:38 WARN Utils: Set SPAR
DataSet 中有 firm 列,我正在向该 DataSet 添加另一列 - firm_id 示例: private val firms: mutable.Map[String, Integer]
DirectFileOutputCommitter 在 Spark 2.2.0 中不再可用。这意味着写入 S3 需要非常长的时间(3 小时对 2 分钟)。我可以通过在 spark-shell 中将 F
在 DataFrame Apache Spark 中的对象(我正在使用 Scala 接口(interface)),如果我正在迭代它的 Row对象,有没有办法按名称提取值?我可以看到如何做一些非常尴尬的
我正在尝试从此处 (Learning Spark book) 运行示例,但出现以下错误: 16/10/07 01:15:26 ERROR Executor: Exception in task 0.0
我从 GH 开发大师那里构建了 Spark 1.4,并且构建顺利。但是当我执行 bin/pyspark 时,我得到了 Python 2.7.9 版本。我该如何更改? 最佳答案 只需设置环境变量: 导出
我开始玩 Spark 2.0.1。新数据集 API 非常干净,但我在执行非常简单的操作时遇到了问题。 也许我遗漏了什么,希望有人能提供帮助。 这些说明 SparkConf conf = new Spa
我有这段代码在 scala 中运行良好: val schema = StructType(Array( StructField("field1", StringType, true),
我正在构建一个 Spark 应用程序,我必须在其中缓存大约 15 GB 的 CSV 文件。我读到了新的 UnifiedMemoryManager Spark 1.6 在这里介绍: https://0x
您好,我正在使用自定义 UDF 对每列中的每个值求平方根。 square_root_UDF = udf(lambda x: math.sqrt(x), DoubleType()) for x in f
我有一个 Apache Spark 应用程序在集群模式下运行在 YARN 集群上(spark 在这个集群上有 3 个节点)。 当应用程序运行时,Spark-UI 显示 2 个执行程序(每个运行在不同的
我有一个有效的 PostgreSQL 查询:当我在 PSQL 中复制/粘贴它时,我得到了想要的结果。 但是,当我使用 Spark SQL 运行时,它会导致 NullPointerException。
我需要使用 spark-sql 读取一个文件,该文件在当前目录中。 我使用此命令解压缩存储在 HDFS 上的文件列表。 val decompressCommand = Seq(laszippath,
提交复制并粘贴到下方的 Spark 应用程序时,我遇到了以下错误。我首先对这个问题进行了一些谷歌搜索,并确定这可能是一个签名 jar 的问题,该签名 jar 作为对我的 jar 构建过程的依赖项被加载
这个问题在这里已经有了答案: How to aggregate values into collection after groupBy? (3 个答案) Spark merge/combine a
我想通过将一个巨大的 csv 文件分割为不同的分区来优化 Spark 应用程序的运行时间,具体取决于它们的特性。 例如。我有一列带有客户 ID(整数,a),一列带有日期(月 + 年,例如 01.201
我在尝试将文件从 hdfs 读取到 Spark 时遇到错误。文件 README.md 存在于 hdfs 中 spark@osboxes hadoop]$ hdfs dfs -ls README.md
一段时间后,无法弄清楚如何在运行 spark-sql 二进制文件时识别以下错误的根本原因: 15/12/08 14:48:41 WARN NativeCodeLoader: Unable to loa
我是一名优秀的程序员,十分优秀!