gpt4 book ai didi

apache-spark - Spark Streaming 在两个流之间共享状态

转载 作者:行者123 更新时间:2023-12-01 04:56:46 25 4
gpt4 key购买 nike

我们可以在两个 DStream 之间共享 Spark 流状态吗?

基本上我想使用第一个流创建/更新状态并使用状态丰富第二个流。

示例:我修改了 StatefulNetworkWordCount 示例。我正在使用第一个流创建状态并使用第一个流的计数丰富第二个流。

val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))


val mappingFuncForFirstStream = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)

Some(output)
}

val mappingFuncForSecondStream = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
val sum = state.getOption.getOrElse(0)
val output = (word, sum)

Some(output)
}



// first stream
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
.flatMap(r=>r._2.split(" "))
.map(x => (x, 1))
.mapWithState(StateSpec.function(mappingFuncForFirstStream).initialState(initialRDD).timeout(Minutes(10)))
.print(1)



// second stream
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams2, mergeTopicSet)
.flatMap(r=>r._2.split(" "))
.map(x => (x, 1))
.mapWithState(StateSpec.function(mappingFuncForSecondStream).initialState(initialRDD).timeout(Minutes(10)))
.print(50)

在检查点目录中,我可以看到两个不同状态的 RDD。

我正在使用 spark-1.6.1 和 kafka-0.8.2.1

最佳答案

可以访问底层StateDStream应用 mapWithState 产生的 DStream使用 stateMappedDStream.snapshotStream() 操作

所以,灵感来自你的例子:

val firstDStream = ???
val secondDStream = ???
val firstDStreamSMapped = firstDStream..mapWithState(...)
val firstStreamState = firstDStreamSMapped.snapshotStream()
// we want to use the state of Stream 1 to enrich Stream 2. The keys of both streams are required to match.
val enrichedStream = secondDStream.join(firstStreamState)
... do stuff with enrichedStream ...

关于apache-spark - Spark Streaming 在两个流之间共享状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36496882/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com