gpt4 book ai didi

scala - Spark Streaming 累计字数

转载 作者:行者123 更新时间:2023-12-04 04:27:22 24 4
gpt4 key购买 nike

这是一个用scala编写的spark流程序。它每 1 秒计算一次套接字中的单词数。结果将是字数,例如,从时间 0 到 1 的字数,然后从时间 1 到 2 的字数。但我想知道是否有什么方法可以改变这个程序,以便我们可以累积字数?也就是说,从时间 0 到现在的单词计数。

val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

最佳答案

您可以使用 StateDStream 为了这。有一个example of stateful word count from sparks examples .

object StatefulNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)

val previousCount = state.getOrElse(0)

Some(currentCount + previousCount)
}

val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".")

// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))

// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the words)
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}

它的工作方式是你得到一个 Seq[T]对于每个批次,然后更新 Option[T]它的作用就像一个累加器。原因是 Option是因为在第一批上它将是 None并保持这种状态,除非它更新。在这个例子中,计数是一个整数,如果你正在处理大量数据,你甚至可能想要一个 LongBigInt

关于scala - Spark Streaming 累计字数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24771823/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com