gpt4 book ai didi

spark-streaming - 如何将 DStream[(Array[String], Long)] 转换为 Spark Streaming 中的数据帧

转载 作者:行者123 更新时间:2023-12-04 01:47:39 24 4
gpt4 key购买 nike

我正在尝试将我的 dstream 转换为 Dataframe。这是用于将我的 dstream 转换为 Dataframe 的代码

           val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "ffff.dl.uk.fff.com:8002",
"security.protocol" -> "SASL_PLAINTEXT",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "1",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("mytopic")
val from_kafkastream = KafkaUtils.createDirectStream[String,
String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val strmk = from_kafkastream.map(record =>
(record.value,record.timestamp))
val splitup2 = strmk.map{ case (line1, line2) =>
(line1.split(","),line2)}

case class Record(name: String, trQ: String, traW: String,traNS:
String, traned: String, tranS: String,transwer: String, trABN:
String,kafkatime: Long)

object SQLContextSingleton {
@transient private var instance: SQLContext = _

def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
splitup2.foreachRDD((rdd) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
spark.sparkContext.setLogLevel("ERROR")
import sqlContext.implicits._
val requestsDataFrame = rdd.map(w => Record(w(0).toString,
w(1).toString, w(2).toString,w(3).toString, w(4).toString,
w(5).toString,w(6).toString, w(7).toString,w(8).toString)).toDF()
// am getting issue here
requestsDataFrame.show()
})
ssc.start()

我收到以下错误消息 enter image description here

有人可以帮助我如何将我的 dstream 转换为 DF,因为我是新的 spark world

最佳答案

也许错误是在构建 Record 对象时因为,你没有传递 kafkatime,只有字符串值,而且元组也是你无法访问这种形式的属性数组。

你可以试试这个:

import session.sqlContext.implicits._
val requestsDataFrame = rdd.map(w => Record(
w._1(0).toString,
w._1(1).toString, w._1(2).toString, w._1(3).toString, w._1.toString,
w._1(5).toString, w._1(6).toString, w._1(7).toString, w._2))

requestsDataFrame.toDF()

关于spark-streaming - 如何将 DStream[(Array[String], Long)] 转换为 Spark Streaming 中的数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54610280/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com