- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在 Spark Scala 中有一些 DStream,我想对它进行排序然后取前 N 个。问题是,每当我尝试运行它时,我都会得到 NotSerializableException
并且异常消息显示:
This is because the DStream object is being referred to from within the closure.
问题是我不知道怎么解决:
这是我的尝试:
package com.badrit.realtime
import java.util.Date
import com.badrit.drivers.UnlimitedSpaceTimeDriver
import com.badrit.model.{CellBuilder, DataReader, Trip}
import com.badrit.utility.Printer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Duration, Milliseconds, StreamingContext}
import scala.collection.mutable
object StreamingDriver {
val appName: String = "HotSpotRealTime"
val hostName = "localhost"
val port = 5050
val constrains = UnlimitedSpaceTimeDriver.constrains;
var streamingRate = 1;
var windowSize = 8;
var slidingInterval = 2;
val cellBuilder = new CellBuilder(constrains)
val inputFilePath = "/home/ahmedelgamal/Downloads/green_tripdata_2015-02.csv"
def prepareTestData(sparkStreamCtx: StreamingContext): InputDStream[Trip] = {
val sparkCtx = sparkStreamCtx.sparkContext
val textFile: RDD[String] = sparkCtx.textFile(inputFilePath)
val data: RDD[Trip] = new DataReader().getTrips(textFile)
val groupedData = data.filter(_.pickup.date.before(new Date(2015, 1, 2, 0, 0, 0)))
.groupBy(trip => trip.pickup.date.getMinutes).sortBy(_._1).map(_._2).collect()
printf("Grouped Data Count is " + groupedData.length)
var dataQueue: mutable.Queue[RDD[Trip]] = mutable.Queue.empty;
groupedData.foreach(trips => dataQueue += sparkCtx.makeRDD(trips.toArray))
printf("\n\nTest Queue size is " + dataQueue.size)
groupedData.zipWithIndex.foreach { case (trips: Iterable[Trip], index: Int) => {
println("Items List " + index)
val passengers: Array[Int] = trips.map(_.passengers).toArray
val cnt = passengers.length
println("Sum is " + passengers.sum)
println("Cnt is " + cnt)
val passengersRdd = sparkCtx.parallelize(passengers)
println("Mean " + passengersRdd.mean())
println("Stdv" + passengersRdd.stdev())
}
}
sparkStreamCtx.queueStream(dataQueue, true)
}
def cellCreator(trip: Trip) = cellBuilder.cellForCarStop(trip.pickup)
def main(args: Array[String]) {
if (args.length < 1) {
streamingRate = 1;
windowSize = 3 //2 hours 60 * 60 * 1000L
slidingInterval = 2 //0.5 hour 60 * 60 * 1000L
}
else {
streamingRate = args(0).toInt;
windowSize = args(1).toInt
slidingInterval = args(2).toInt
}
val sparkConf = new SparkConf().setAppName(appName).setMaster("local[*]")
val sparkStreamCtx = new StreamingContext(sparkConf, Milliseconds(streamingRate))
sparkStreamCtx.sparkContext.setLogLevel("ERROR")
sparkStreamCtx.checkpoint("/tmp")
val data: InputDStream[Trip] = prepareTestData(sparkStreamCtx)
val dataWindow = data.window(new Duration(windowSize), new Duration(slidingInterval))
//my main problem lies in the following line
val newDataWindow = dataWindow.transform(rdd => sparkStreamCtx.sparkContext.parallelize(rdd.take(10)))
newDataWindow.print
sparkStreamCtx.start()
sparkStreamCtx.awaitTerminationOrTimeout(1000)
}
}
我不介意任何其他方式来对 DStream 进行排序并获得它的前 N 个,而不是我的方式。
最佳答案
可以在DStream对象中使用transform方法,然后对输入的RDD进行排序,将其中的n个元素放入一个列表中,然后过滤出原始的RDD,使其包含在这个列表中。
val n = 10
val topN = result.transform(rdd =>{
val list = rdd.sortBy(_._1).take(n)
rdd.filter(list.contains)
})
topN.print
关于scala - 对 DStream 进行排序并取 topN,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38813106/
我正在使用 AdventureWorks2016 数据库,并创建了一个简单的表格,显示所有产品及其总销售额。销售金额正确。 销售额值来自名为 [Internet Sales 总和] 的度量。看起来像这
我有一个类似于以下内容的数据集: Year Location Type Amount 2015 West Apple 12 2015 West
我正在使用 AdventureWorks2016 数据库,并创建了一个简单的表格,显示所有产品及其总销售额。销售金额正确。 销售额值来自名为 [Internet Sales 总和] 的度量。看起来像这
如下所示: ? 1
我在 Spark Scala 中有一些 DStream,我想对它进行排序然后取前 N 个。问题是,每当我尝试运行它时,我都会得到 NotSerializableException 并且异常消息显示:
python 中有 heapq,用于一般用途。我想记录 topN(0~20) 10e7 条记录。 如果使用 heapq,应使用 '-' 将最大值转换为最小值;并记录底部的最小数量,调用 heapq.h
我有一个 Spark SQL 数据帧: user1 item1 rating1 user1 item2 rating2 user1 item3 rating3 user2 item1 rating4
我有以下数据: individual groupID choice probA probB 0 9710535 0 0 0.02558
我是一名优秀的程序员,十分优秀!