gpt4 book ai didi

apache-kafka - 读取第一批数据后停止 Spark 流

转载 作者:行者123 更新时间:2023-12-05 01:02:00 25 4
gpt4 key购买 nike

我正在使用 Spark 流来消费 kafka 消息。我想从 kafka 获取一些消息作为样本,而不是读取所有消息。所以我想读取一批消息,将它们返回给调用者并停止 Spark 流。目前我正在 Spark 流上下文方法的 awaitTermination 方法中传递 batchInterval 时间。我现在不知道如何将处理过的数据从 Spark 流返回给调用者。这是我目前正在使用的代码

def getsample(params: scala.collection.immutable.Map[String, String]): Unit = {
if (params.contains("zookeeperQourum"))
zkQuorum = params.get("zookeeperQourum").get
if (params.contains("userGroup"))
group = params.get("userGroup").get
if (params.contains("topics"))
topics = params.get("topics").get
if (params.contains("numberOfThreads"))
numThreads = params.get("numberOfThreads").get
if (params.contains("sink"))
sink = params.get("sink").get
if (params.contains("batchInterval"))
interval = params.get("batchInterval").get.toInt
val sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("spark://cloud2-server:7077")
val ssc = new StreamingContext(sparkConf, Seconds(interval))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
var consumerConfig = scala.collection.immutable.Map.empty[String, String]
consumerConfig += ("auto.offset.reset" -> "smallest")
consumerConfig += ("zookeeper.connect" -> zkQuorum)
consumerConfig += ("group.id" -> group)
var data = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap, StorageLevel.MEMORY_ONLY).map(_._2)
val streams = data.window(Seconds(interval), Seconds(interval)).map(x => new String(x))
streams.foreach(rdd => rdd.foreachPartition(itr => {
while (itr.hasNext && size >= 0) {
var msg=itr.next
println(msg)
sample.append(msg)
sample.append("\n")
size -= 1
}
}))
ssc.start()
ssc.awaitTermination(5000)
ssc.stop(true)
}

因此,我不想将消息保存在名为“sample”的字符串构建器中,而是想返回给调用者。

最佳答案

你可以实现一个 StreamingListener 然后在它里面,onBatchCompleted 你可以调用 ssc.stop()

private class MyJobListener(ssc: StreamingContext) extends StreamingListener {

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {

ssc.stop(true)

}

}

这是将 SparkStreaming 附加到 JobListener 的方式:
val listen = new MyJobListener(ssc)
ssc.addStreamingListener(listen)

ssc.start()
ssc.awaitTermination()

关于apache-kafka - 读取第一批数据后停止 Spark 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27251055/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com