gpt4 book ai didi

scala - 有状态流式 Spark 处理

转载 作者:行者123 更新时间:2023-12-04 14:59:01 25 4
gpt4 key购买 nike

我正在学习 Spark 并尝试构建一个简单的流媒体服务。

例如我有一个 Kafka 队列和一个类似 words count 的 Spark 作业.该示例使用无状态模式。我想累积字数,这样如果 test 在不同的消息中被发送了几次,我就可以获得所有出现的总数。

使用其他示例,例如 StatefulNetworkWordCount我尝试修改我的 Kafka 流媒体服务

val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))

ssc.checkpoint("/tmp/data")

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))

val wordDstream = words.map(x => (x, 1))

// Update the cumulative count using mapWithState
// This will give a DStream made of state (which is the cumulative count of the words)
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}

val stateDstream = wordDstream.mapWithState(
StateSpec.function(mappingFunc) /*.initialState(initialRDD)*/)

stateDstream.print()

stateDstream.map(s => (s._1, s._2.toString)).foreachRDD(rdd => sc.toRedisZSET(rdd, "word_count", 0))

// Start the computation
ssc.start()
ssc.awaitTermination()

我收到很多错误,比如

17/03/26 21:33:57 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@2b680207)
- field (class: com.DirectKafkaWordCount$$anonfun$main$2, name: sc$1, type: class org.apache.spark.SparkContext)
- object (class com.DirectKafkaWordCount$$anonfun$main$2, <function1>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1)

虽然无状态版本工作正常没有错误

val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)

// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _).map(s => (s._1, s._2.toString))
wordCounts.print()

wordCounts.foreachRDD(rdd => sc.toRedisZSET(rdd, "word_count", 0))

// Start the computation
ssc.start()
ssc.awaitTermination()

问题是如何使流式有状态字数统计。

最佳答案

在这一行:

ssc.checkpoint("/tmp/data")

您已启用检查点,这意味着您的所有内容:

wordCounts.foreachRDD(rdd => sc.toRedisZSET(rdd, "word_count", 0))

必须是可序列化的,而 sc 本身不是,从错误消息中可以看出:

object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@2b680207)

删除检查点代码行将对此有所帮助。

另一种方法是连续计算你的DStreamRDD或者直接将数据写入redis,比如:

wordCounts.foreachRDD{rdd => 
rdd.foreachPartition(partition => RedisContext.setZset("word_count", partition, ttl, redisConfig)
}

RedisContext 是一个不依赖于 SparkContext 的可序列化对象

另请参阅:https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala

关于scala - 有状态流式 Spark 处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43035752/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com