gpt4 book ai didi

scala - 在 Spark/Scala 中将 RDD 转换为数据帧

转载 作者:可可西里 更新时间:2023-11-01 14:15:41 30 4
gpt4 key购买 nike

RDD 以 Array[Array[String]] 格式创建并具有以下值:

val rdd : Array[Array[String]] = Array(
Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"),
Array("4580056797", "0", "2015-07-29 10:38:43", "0", "1", "1"))

我想用模式创建一个数据框:

val schemaString = "callId oCallId callTime duration calltype swId"

后续步骤:

scala> val rowRDD = rdd.map(p => Array(p(0), p(1), p(2),p(3),p(4),p(5).trim))
rowRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:39
scala> val calDF = sqlContext.createDataFrame(rowRDD, schema)

出现以下错误:

console:45: error: overloaded method value createDataFrame with alternatives:
(rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
cannot be applied to (org.apache.spark.rdd.RDD[Array[String]],
org.apache.spark.sql.types.StructType)
val calDF = sqlContext.createDataFrame(rowRDD, schema)

最佳答案

只需粘贴到 spark-shell 中:

val a = 
Array(
Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"),
Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"))

val rdd = sc.makeRDD(a)

case class X(callId: String, oCallId: String,
callTime: String, duration: String, calltype: String, swId: String)

然后 map() 在 RDD 上创建案例类的实例,然后使用 toDF() 创建 DataFrame:

scala> val df = rdd.map { 
case Array(s0, s1, s2, s3, s4, s5) => X(s0, s1, s2, s3, s4, s5) }.toDF()
df: org.apache.spark.sql.DataFrame =
[callId: string, oCallId: string, callTime: string,
duration: string, calltype: string, swId: string]

这从案例类中推断出模式。

然后你可以继续:

scala> df.printSchema()
root
|-- callId: string (nullable = true)
|-- oCallId: string (nullable = true)
|-- callTime: string (nullable = true)
|-- duration: string (nullable = true)
|-- calltype: string (nullable = true)
|-- swId: string (nullable = true)

scala> df.show()
+----------+-------+-------------------+--------+--------+----+
| callId|oCallId| callTime|duration|calltype|swId|
+----------+-------+-------------------+--------+--------+----+
|4580056797| 0|2015-07-29 10:38:42| 0| 1| 1|
|4580056797| 0|2015-07-29 10:38:42| 0| 1| 1|
+----------+-------+-------------------+--------+--------+----+

如果您想在普通程序中使用 toDF()(而不是在 spark-shell 中),请确保(引自 here):

  • 在创建 SQLContext 之后立即import sqlContext.implicits._
  • 使用 toDF() 在方法之外定义案例类

关于scala - 在 Spark/Scala 中将 RDD 转换为数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33127970/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com