- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我试图访问 Tuple2 中的一个字段,编译器向我返回一个错误。该软件尝试在 kafka 主题中推送案例类,然后我想使用 Spark 流恢复它,以便我可以提供机器学习算法并将结果保存在 mongo 实例中。
解决了!
我终于解决了我的问题,我将发布最终解决方案:
这是github项目:
https://github.com/alonsoir/awesome-recommendation-engine/tree/develop
name := "my-recommendation-spark-engine"
version := "1.0-SNAPSHOT"
scalaVersion := "2.10.4"
val sparkVersion = "1.6.1"
val akkaVersion = "2.3.11" // override Akka to be this version to match the one in Spark
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka_2.10" % "0.8.1"
exclude("javax.jms", "jms")
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri"),
//not working play module!! check
//jdbc,
//anorm,
//cache,
// HTTP client
"net.databinder.dispatch" %% "dispatch-core" % "0.11.1",
// HTML parser
"org.jodd" % "jodd-lagarto" % "3.5.2",
"com.typesafe" % "config" % "1.2.1",
"com.typesafe.play" % "play-json_2.10" % "2.4.0-M2",
"org.scalatest" % "scalatest_2.10" % "2.2.1" % "test",
"org.twitter4j" % "twitter4j-core" % "4.0.2",
"org.twitter4j" % "twitter4j-stream" % "4.0.2",
"org.codehaus.jackson" % "jackson-core-asl" % "1.6.1",
"org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test",
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1" ,
"org.apache.spark" % "spark-core_2.10" % "1.6.1" ,
"org.apache.spark" % "spark-streaming_2.10" % "1.6.1",
"org.apache.spark" % "spark-sql_2.10" % "1.6.1",
"org.apache.spark" % "spark-mllib_2.10" % "1.6.1",
"com.google.code.gson" % "gson" % "2.6.2",
"commons-cli" % "commons-cli" % "1.3.1",
"com.stratio.datasource" % "spark-mongodb_2.10" % "0.11.1",
// Akka
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
// MongoDB
"org.reactivemongo" %% "reactivemongo" % "0.10.0"
)
packAutoSettings
//play.Project.playScalaSettings
package example.producer
import play.api.libs.json._
import example.utils._
import scala.concurrent.Future
import example.model.{AmazonProductAndRating,AmazonProduct,AmazonRating}
import example.utils.AmazonPageParser
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
/**
args(0) : productId
args(1) : userdId
Usage: ./amazon-producer-example 0981531679 someUserId 3.0
*/
object AmazonProducerExample {
def main(args: Array[String]): Unit = {
val productId = args(0).toString
val userId = args(1).toString
val rating = args(2).toDouble
val topicName = "amazonRatingsTopic"
val producer = Producer[String](topicName)
//0981531679 is Scala Puzzlers...
AmazonPageParser.parse(productId,userId,rating).onSuccess { case amazonRating =>
//Is this the correct way? the best performance? possibly not, what about using avro or parquet? How can i push data in avro or parquet format?
//You can see that i am pushing json String to kafka topic, not raw String, but is there any difference?
//of course there are differences...
producer.send(Json.toJson(amazonRating).toString)
//producer.send(amazonRating.toString)
println("amazon product with rating sent to kafka cluster..." + amazonRating.toString)
System.exit(0)
}
}
}
package example.model
import play.api.libs.json.Json
import reactivemongo.bson.Macros
case class AmazonProduct(itemId: String, title: String, url: String, img: String, description: String)
case class AmazonRating(userId: String, productId: String, rating: Double)
case class AmazonProductAndRating(product: AmazonProduct, rating: AmazonRating)
// For MongoDB
object AmazonRating {
implicit val amazonRatingHandler = Macros.handler[AmazonRating]
implicit val amazonRatingFormat = Json.format[AmazonRating]
//added using @Yuval tip
lazy val empty: AmazonRating = AmazonRating("-1", "-1", -1d)
}
package example.spark
import java.io.File
import java.util.Date
import play.api.libs.json._
import com.google.gson.{Gson,GsonBuilder, JsonParser}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import com.mongodb.casbah.Imports._
import com.mongodb.QueryBuilder
import com.mongodb.casbah.MongoClient
import com.mongodb.casbah.commons.{MongoDBList, MongoDBObject}
import reactivemongo.api.MongoDriver
import reactivemongo.api.collections.default.BSONCollection
import reactivemongo.bson.BSONDocument
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
import example.model._
import example.utils.Recommender
/**
* Collect at least the specified number of json amazon products in order to feed recomedation system and feed mongo instance with results.
Usage: ./amazon-kafka-connector 127.0.0.1:9092 amazonRatingsTopic
on mongo shell:
use alonsodb;
db.amazonRatings.find();
*/
object AmazonKafkaConnector {
private var numAmazonProductCollected = 0L
private var partNum = 0
private val numAmazonProductToCollect = 10000000
//this settings must be in reference.conf
private val Database = "alonsodb"
private val ratingCollection = "amazonRatings"
private val MongoHost = "127.0.0.1"
private val MongoPort = 27017
private val MongoProvider = "com.stratio.datasource.mongodb"
private val jsonParser = new JsonParser()
private val gson = new GsonBuilder().setPrettyPrinting().create()
private def prepareMongoEnvironment(): MongoClient = {
val mongoClient = MongoClient(MongoHost, MongoPort)
mongoClient
}
private def closeMongoEnviroment(mongoClient : MongoClient) = {
mongoClient.close()
println("mongoclient closed!")
}
private def cleanMongoEnvironment(mongoClient: MongoClient) = {
cleanMongoData(mongoClient)
mongoClient.close()
}
private def cleanMongoData(client: MongoClient): Unit = {
val collection = client(Database)(ratingCollection)
collection.dropCollection()
}
def main(args: Array[String]) {
// Process program arguments and set properties
if (args.length < 2) {
System.err.println("Usage: " + this.getClass.getSimpleName + " <brokers> <topics>")
System.exit(1)
}
val Array(brokers, topics) = args
println("Initializing Streaming Spark Context and kafka connector...")
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector")
.setMaster("local[4]")
.set("spark.driver.allowMultipleContexts", "true")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
sc.addJar("target/scala-2.10/blog-spark-recommendation_2.10-1.0-SNAPSHOT.jar")
val ssc = new StreamingContext(sparkConf, Seconds(2))
//this checkpointdir should be in a conf file, for now it is hardcoded!
val streamingCheckpointDir = "/Users/aironman/my-recommendation-spark-engine/checkpoint"
ssc.checkpoint(streamingCheckpointDir)
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
println("Initialized Streaming Spark Context and kafka connector...")
//create recomendation module
println("Creating rating recommender module...")
val ratingFile= "ratings.csv"
val recommender = new Recommender(sc,ratingFile)
println("Initialized rating recommender module...")
//THIS IS THE MOST INTERESTING PART AND WHAT I NEED!
//THE SOLUTION IS NOT PROBABLY THE MOST EFFICIENT, BECAUSE I HAD TO
//USE DATAFRAMES, ARRAYs and SEQs BUT IS FUNCTIONAL!
try{
messages.foreachRDD(rdd => {
val count = rdd.count()
if (count > 0){
val json= rdd.map(_._2)
val dataFrame = sqlContext.read.json(json) //converts json to DF
val myRow = dataFrame.select(dataFrame("userId"),dataFrame("productId"),dataFrame("rating")).take(count.toInt)
println("myRow is: " + myRow)
val myAmazonRating = AmazonRating(myRow(0).getString(0), myRow(0).getString(1), myRow(0).getDouble(2))
println("myAmazonRating is: " + myAmazonRating.toString)
val arrayAmazonRating = Array(myAmazonRating)
//this method needs Seq[AmazonRating]
recommender.predictWithALS(arrayAmazonRating.toSeq)
}//if
})
}catch{
case e: IllegalArgumentException => {println("illegal arg. exception")};
case e: IllegalStateException => {println("illegal state exception")};
case e: ClassCastException => {println("ClassCastException")};
case e: Exception => {println(" Generic Exception")};
}finally{
println("Finished taking data from kafka topic...")
}
ssc.start()
ssc.awaitTermination()
println("Finished!")
}
}
def predict(ratings: Seq[AmazonRating]) = {
// train model
val myRatings = ratings.map(toSparkRating)
val myRatingRDD = sc.parallelize(myRatings)
val startAls = DateTime.now
val model = ALS.train((sparkRatings ++ myRatingRDD).repartition(NumPartitions), 10, 20, 0.01)
val myProducts = myRatings.map(_.product).toSet
val candidates = sc.parallelize((0 until productDict.size).filterNot(myProducts.contains))
// get ratings of all products not in my history ordered by rating (higher first) and only keep the first NumRecommendations
val myUserId = userDict.getIndex(MyUsername)
val recommendations = model.predict(candidates.map((myUserId, _))).collect
val endAls = DateTime.now
val result = recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating)
val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds
println(s"ALS Time: $alsTime seconds")
result
}
最佳答案
问题是声明rdd.take(count.toInt)
返回 Array[T]
,如前所述 here
def take(num: Int): Array[T]
Take the first num elements of the RDD.
RDD
说取其中的前 n 个元素。然后,与您猜测的不同,您没有类型
Tuple2
的对象。 ,而是一个数组。
mkString
在
Array
上定义输入获取单个
String
与数组的所有元素。
关于scala - 关于访问 Tuple2 中的字段时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37303202/
在有些场景下,我们需要对我们的varchar类型的字段做修改,而修改的结果为两个字段的拼接或者一个字段+字符串的拼接。 如下所示,我们希望将xx_role表中的name修改为name+id。
SELECT incMonth as Month, SUM( IF(item_type IN('typ1', 'typ2') AND incMonth = Month, 1, 0 ) )AS
我最近读到 volatile 字段是线程安全的,因为 When we use volatile keyword with a variable, all the threads read its va
我在一些模型中添加了一个 UUID 字段,然后使用 South 进行了迁移。我创建的任何新对象都正确填充了 UUID 字段。但是,我所有旧数据的 UUID 字段为空。 有没有办法为现有数据填充 UUI
刚刚将我的网站从 mysql_ 更新为 mysqli,并破坏了之前正常运行的查询。 我试图从旋转中提取 id,因为它每次都会增加 1,但我不断获取玩家 id,有人可以告诉我我做错了什么吗?我尝试了将
我在 Mac OS X 上使用带有 Sequel Pro 的 MySQL。我想将一个表中的一个字段(即名为“GAME_DY”的列)复制到另一个名为“DAY_ID”的表的空字段中。两个表都是同一数据库的
问题: 是否有可能有一个字段被 JPA 保留但被序列化跳过? 可以实现相反的效果(JPA 跳过字段而序列化则不会),如果使用此功能,那么相反的操作肯定会很有用。 类似这样的事情: @Entity cl
假设我有一个名为“dp”的表 Year | Month | Payment| Payer_ID | Payment_Recipient | 2008/2009 | July
我将尝试通过我的 Raspberry Pi 接入点保证一些 QoS。 开始之前,我先动手:我阅读了有关 tcp、udp 和 ip header 的内容。在IP header description我看
如果你能弄清楚如何重命名这个问题,我愿意接受建议。 在 Dart 语言中,可以编写一个带有 final 字段的类。这些是只能设置的字段构造函数前 body 跑。这可以在声明中(通常用于类中的静态常量)
你怎么样? 我有两个带有两个字段的日期选择器 我希望当用户选择 (From) 时,第二个字段 (TO) 将是 next day 。比如 booking.com 例如:当用户选择From 01-01-2
我想我已经看到了这个问题的一些答案,这些答案可能与我需要的相差不远,但我对 mysql 的了解还不够确定,所以我会根据我的具体情况提出问题。 我有一个包含多个表的数据库,为此,如果“image”表上的
我在 mySQL 数据库中有 2 个表: customers ============ customer_id (1, 2 ) customer_name (john, mark) orders ==
我正在开发一个员工目标 Web 应用程序。 领导/经理在与团队成员讨论后为他们设定目标。这是一年/半年/季度,具体取决于组织遵循的评估周期。 现在的问题是添加基于时间段的字段或存档上一季度/年度数据的
我正在寻找允许内容编辑器从媒体库中选择多个文件的东西,这些文件将在渲染中列出。他们还需要能够上传文件和搜索。它必须在页面编辑器(版本 8 中称为体验编辑器)中工作。 到目前为止我所考虑的: 一堆文件字
现在,我有以下由 original.df %.% group_by(Category) %.% tally() %.% arrange(desc(n)) 创建的 data.frame。 DF 5),
我想知道是否有一些步骤/解决方案可以处理错误消息并将它们放入 Pentaho 工具中的某个字符串或字段中?例如,如果连接到数据库时发生某些错误,则将该消息从登录到字符串/字段。 最佳答案 我们在作业的
如何制作像短信应用程序一样的“收件人”字段?例如,右侧有一个“+”按钮,当添加某人时,名称将突出显示并可单击,如圆角矩形等。有没有内置的框架? 最佳答案 不,但请参阅 Three20 的 TTMess
是否可以获取记录的元素或字段的列表 通过类型信息类似于类的已发布属性的列表吗? 谢谢 ! 最佳答案 取决于您的delphi版本,如果您使用的是delphi 2010或更高版本,则可以使用“新rtti”
我正在构建一个 SQLite 数据库来保存我的房地产经纪人的列表。我已经能够使用外键来识别每个代理的列表,但我想在每个代理的记录中创建一个列表;从代理商和列表之间的一对一关系转变为一对多关系。 看这里
我是一名优秀的程序员,十分优秀!