gpt4 book ai didi

scala - 使用 Spark 和 Scala 将数据插入 Hive 表时出现问题

转载 作者:可可西里 更新时间:2023-11-01 16:39:47 28 4
gpt4 key购买 nike

我是 Spark 的新手。这是我想做的事情。

我创建了两个数据流;第一个从文本文件中读取数据并使用 hivecontext 将其注册为 temptable。另一个不断从 Kafka 获取 RDD,对于每个 RDD,它创建数据流并将内容注册为 temptable。最后,我将这两个临时表连接到一个键上以获得最终结果集。我想将该结果集插入配置单元表中。但我没有想法。试图遵循一些示例,但只在配置单元中创建一个包含一列的表,而且该表也不可读。你能告诉我如何在特定的数据库和配置单元表中插入结果吗?请注意,我可以使用 show 函数看到连接的结果,因此真正的挑战在于插入配置单元表。

下面是我使用的代码。

    imports.....

object MSCCDRFilter {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Flume, Kafka and Spark MSC CDRs Manipulation")

val sc = new SparkContext(sparkConf)
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
val cgiDF = sc.textFile("file:///tmp/omer-learning/spark/dim_cells.txt").map(_.split(",")).map(p => CGIList(p(0).trim, p(1).trim, p(2).trim,p(3).trim)).toDF()
cgiDF.registerTempTable("my_cgi_list")
val CGITable=sqlContext.sql("select *"+
" from my_cgi_list")
CGITable.show() // this CGITable is a structure I defined in the project
val streamingContext = new StreamingContext(sc, Seconds(10)
val zkQuorum="hadoopserver:2181"
val topics=Map[String, Int]("FlumeToKafka"->1)

val messages: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(streamingContext,zkQuorum,"myGroup",topics)

val logLinesDStream = messages.map(_._2) //获取数据
logLinesDStream.print()
val MSCCDRDStream = logLinesDStream.map(MSC_KPI.parseLogLine) // change MSC_KPI to MCSCDR_GO if you wanna change the class
// MSCCDR_GO and MSC_KPI are structures defined in the project
MSCCDRDStream.foreachRDD(MSCCDR => {
println("+++++++++++++++++++++NEW RDD ="+ MSCCDR.count())

if (MSCCDR.count() == 0) {
println("==================No logs received in this time interval=================")
} else {

val dataf=sqlContext.createDataFrame(MSCCDR)
dataf.registerTempTable("hive_msc")
cgiDF.registerTempTable("my_cgi_list")
val sqlquery=sqlContext.sql("select a.cdr_type,a.CGI,a.cdr_time, a.mins_int, b.Lat, b.Long,b.SiteID from hive_msc a left join my_cgi_list b"
+" on a.CGI=b.CGI")
sqlquery.show()
sqlContext.sql("SET hive.exec.dynamic.partition = true;")
sqlContext.sql("SET hive.exec.dynamic.partition.mode = nonstrict;")
sqlquery.write.mode("append").partitionBy("CGI").saveAsTable("omeralvi.msc_data")

val FilteredCDR = sqlContext.sql("select p.*, q.* " +
" from MSCCDRFiltered p left join my_cgi_list q " +
"on p.CGI=q.CGI ")

println("======================print result =================")
FilteredCDR.show()
streamingContext.start()
streamingContext.awaitTermination()
}
}

最佳答案

我使用以下方法成功写入 Hive:

dataFrame
.coalesce(n)
.write
.format("orc")
.options(Map("path" -> savePath))
.mode(SaveMode.Append)
.saveAsTable(fullTableName)

我们尝试使用分区的尝试没有得到落实,因为我认为我们所需的分区列存在一些问题。

唯一的限制是并发写入,其中表尚不存在,然后任何任务尝试创建表(因为它在第一次尝试写入表时不存在)都会抛出异常。

请注意,在流式应用程序中写入 Hive 通常是糟糕的设计,因为您经常会写入许多小文件,读取和存储效率非常低。因此,如果您向 Hive 写入的频率超过每小时左右,则应确保包含压缩逻辑,或添加更适合事务数据的中间存储层。

关于scala - 使用 Spark 和 Scala 将数据插入 Hive 表时出现问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43634017/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com