gpt4 book ai didi

apache-spark - 如何在同一个 Spark Streaming 上运行多个操作

转载 作者:行者123 更新时间:2023-12-03 09:26:54 25 4
gpt4 key购买 nike

我正在将 Spark-streaming 与 RabbitMQ 一起使用。因此,流作业从 RabbitMQ 获取数据并应用一些转换和操作。所以,我想知道如何在同一个流上应用多个操作(即计算两个不同的特征集)。是否可以?如果是,如何将流对象传递给代码中提到的多个类?

            val config = ConfigFactory.parseFile(new File("SparkStreaming.conf"))
val conf = new SparkConf(true).setAppName(config.getString("AppName"))
conf.set("spark.cleaner.ttl", "120000")

val sparkConf = new SparkContext(conf)
val ssc = new StreamingContext(sparkConf, Seconds(config.getLong("SparkBatchInterval")))

val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2","queueName" -> config.getString("RealTimeQueueName"),"host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("QueueExchangeName"), "routingKeys" -> config.getString("QueueRoutingKey"))
val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams)
receiverStream.start()

如何从这里处理流:

            val objProcessFeatureSet1 = new ProcessFeatureSet1(Some_Streaming_Object)
val objProcessFeatureSet2 = new ProcessFeatureSet2(Some_Streaming_Object)

ssc.start()
ssc.awaitTermination()

最佳答案

您可以在同一个数据流上运行多个操作,如下所示:

import net.minidev.json.JSONValue
import net.minidev.json.JSONObject

val config = ConfigFactory.parseFile(new File("SparkStreaming.conf"))
val conf = new SparkConf(true).setAppName(config.getString("AppName"))
conf.set("spark.cleaner.ttl", "120000")

val sparkConf = new SparkContext(conf)
val ssc = new StreamingContext(sparkConf, Seconds(config.getLong("SparkBatchInterval")))

val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2","queueName" -> config.getString("RealTimeQueueName"),"host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("QueueExchangeName"), "routingKeys" -> config.getString("QueueRoutingKey"))
val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams)

val jsonStream = receiverStream.map(byteData => {
JSONValue.parse(byteData)
})
jsonStream.filter(json => {
var customerType = json.get("customerType")
if(customerType.equals("consumer"))
true
else
false
}).foreachRDD(rdd => {
rdd.foreach(json => {
println("json " + json)
})
})

jsonStream.filter(json => {
var customerType = json.get("customerType")
if(customerType.equals("non-consumer"))
true
else
false
}).foreachRDD(rdd => {
rdd.foreach(json => {
println("json " + json)
})
})
ssc.start()
ssc.awaitTermination()

在上面的代码片段中,我首先从接收到的流创建 jsonStream,然后根据客户类型从中创建两个不同的流,然后对它们应用 (foreachRDD) 操作以打印结果。

以类似的方式,您可以将相同的数据流传递给两个不同的类,并应用其中的转换和操作来计算不同的特征集。

希望以上解释能帮助您解决问题。

谢谢,
霍卡姆

关于apache-spark - 如何在同一个 Spark Streaming 上运行多个操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39246222/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com