gpt4 book ai didi

sql - 尝试通过Spark Streaming运行SparkSQL

转载 作者:行者123 更新时间:2023-12-04 13:38:59 26 4
gpt4 key购买 nike

我正在尝试对Spark中的流数据运行SQL查询。这看起来很简单,但是当我尝试时,出现未找到错误表:tablename> 。它找不到我注册的表。

将Spark SQL与批处理数据配合使用可以很好地工作,因此我认为它与我如何调用streamingcontext.start()有关。任何想法是什么问题?这是代码:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext

object Streaming {

def main(args: Array[String]) {

val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
val sc = new SparkContext(sparkConf)
// Create the context
val ssc = new StreamingContext(sc, Seconds(2))

val sqc = new SQLContext(sc);
import sqc.createSchemaRDD

// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people.txt")
lines.foreachRDD(rdd=>rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data"))
// lines.foreachRDD(rdd=>rdd.foreach(println))
val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
ssc.start()
ssc.awaitTermination()
}
}

任何建议欢迎。谢谢。

最佳答案

好吧,我知道了这个问题。您必须在foreachRDD函数中查询数据,否则无法识别该表。像这样的作品:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration

object Mlist {

def main(args: Array[String]) {

val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
val sc = new SparkContext(sparkConf)
// Create the context
val ssc = new StreamingContext(sc, Seconds(2))

val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people.txt")
lines.foreachRDD(rdd=>rdd.foreach(println))

val sqc = new SQLContext(sc);
import sqc.createSchemaRDD

// Create the FileInputDStream on the directory and use the
// stream to count words in new files created

lines.foreachRDD(rdd=>{
rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data")
val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
teenagers.foreach(println)
})

ssc.start()
ssc.awaitTermination()
}
}

关于sql - 尝试通过Spark Streaming运行SparkSQL,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25418715/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com