gpt4 book ai didi

scala - 使用 Apache Spark 将 MongoDB 数据保存为 parquet 文件格式

转载 作者:可可西里 更新时间:2023-11-01 14:37:31 27 4
gpt4 key购买 nike

我是 Apache spark 和 Scala 编程语言的新手。

我想要实现的是从我的本地 mongoDB 数据库中提取数据,然后将其保存在 parquet format 中。将 Apache Spark 与 hadoop 连接器结合使用

到目前为止,这是我的代码:

package com.examples 
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.hadoop.conf.Configuration
import org.bson.BSONObject
import com.mongodb.hadoop.{MongoInputFormat, BSONFileInputFormat}
import org.apache.spark.sql
import org.apache.spark.sql.SQLContext

object DataMigrator {

def main(args: Array[String])
{
val conf = new SparkConf().setAppName("Migration App").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// Import statement to implicitly convert an RDD to a DataFrame
import sqlContext.implicits._

val mongoConfig = new Configuration()
mongoConfig.set("mongo.input.uri", "mongodb://localhost:27017/mongosails4.case")

val mongoRDD = sc.newAPIHadoopRDD(mongoConfig, classOf[MongoInputFormat], classOf[Object], classOf[BSONObject]);

val count = countsRDD.count()

// the count value is aprox 100,000
println("================ PRINTING =====================")
println(s"ROW COUNT IS $count")
println("================ PRINTING =====================")
}
}

问题是,为了将数据保存为 parquet 文件格式,首先需要将 mongoRDD 变量转换为 Spark DataFrame。我试过这样的事情:

// convert RDD to DataFrame
val myDf = mongoRDD.toDF() // this lines throws an error
myDF.write.save("my/path/myData.parquet")

我得到的错误是这样的:线程“主”scala.MatchError 中的异常:java.lang.Object(类 scala.reflect.internal.Types.$TypeRef$$anon$6)

你们还有什么其他想法吗?如何将 RDD 转换为 DataFrame,以便我可以将数据保存为 parquet 格式?

这是 mongoDB 集合中一个文档的结构:https://gist.github.com/kingtrocko/83a94238304c2d654fe4

最佳答案

创建一个 Case 类来表示存储在您的 DBObject 中的数据。
案例类 Data(x: Int, s: String)

然后,将 rdd 的值映射到案例类的实例。val dataRDD = mongoRDD.values.map { obj => Data(obj.get("x"), obj.get("s")) }

现在有了你的 RDD[Data],你可以用 sqlContext 创建一个 DataFrame

val myDF = sqlContext.createDataFrame(dataRDD)

这应该让你继续。如果需要,我可以稍后再解释。

关于scala - 使用 Apache Spark 将 MongoDB 数据保存为 parquet 文件格式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31838468/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com