gpt4 book ai didi

mongodb - 在 Scala 中从 mongoDB 读取

转载 作者:行者123 更新时间:2023-12-03 17:11:35 25 4
gpt4 key购买 nike

我想创建一个独立的 Scala 代码,它使用自定义设置从 MongoDB 中读取 this code在 MongoDB 网站上。

当我运行 SBT 包时,我遇到了一些错误。我猜这与 SparkSession 的错误创建方法有关。你能给我一个提示来修复它吗?

我的 Buid.sbt内容

scalaVersion := "2.11.12"

libraryDependencies ++= Seq(
"org.mongodb.spark" %% "mongo-spark-connector" % "2.4.1",
"org.apache.spark" %% "spark-core" % "2.4.1",
"org.apache.spark" %% "spark-sql" % "2.4.1"
)
Firstapp.scala代码
package com.mongodb
import org.apache.spark.sql.SparkSession
import com.mongodb.spark.config.{ReadConfig,WriteConfig}
import com.mongodb.spark.MongoSpark
import org.bson.Document

object FirstApp {
def main(args: Array[String]) {

val sc = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate()

val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd = MongoSpark.load(sc, readConfig)

println(customRdd.count)
println(customRdd.first.toJson)

}
}

以及运行后的错误 sbt package
    value toJson is not a member of org.apache.spark.sql.Row
[error] println(customRdd.first.toJson)
[error] ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 10 s, completed Jun 10, 2020 6:10:50 PM

编辑1:

我尝试了解决方案,但它没有正确编译。 Buid.sbt内容和上面一样。我改了 SimpleApp.scala进入:
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession

object FirstApp {
def main(args: Array[String]) {

val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate()
val sc = spark.sparkContext

val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd = MongoSpark.load(sc)
println(customRdd.count())
println(customRdd.first.toJson)

}
}

编译结果:
$ spark-submit   --class "FirstApp"   --master local[4]   target/scala-2.11/root-2_2.11-0.1.0-SNAPSHOT.jar 
20/06/12 07:09:53 WARN Utils: Your hostname, Project resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
20/06/12 07:09:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/06/12 07:09:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/06/12 07:09:54 INFO SparkContext: Running Spark version 2.4.5
20/06/12 07:09:54 INFO SparkContext: Submitted application: MongoSparkConnectorIntro
20/06/12 07:09:55 INFO SecurityManager: Changing view acls to: sadegh
20/06/12 07:09:55 INFO SecurityManager: Changing modify acls to: sadegh
20/06/12 07:09:55 INFO SecurityManager: Changing view acls groups to:
20/06/12 07:09:55 INFO SecurityManager: Changing modify acls groups to:
20/06/12 07:09:55 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(sadegh); groups with view permissions: Set(); users with modify permissions: Set(sadegh); groups with modify permissions: Set()
20/06/12 07:09:55 INFO Utils: Successfully started service 'sparkDriver' on port 33031.
20/06/12 07:09:55 INFO SparkEnv: Registering MapOutputTracker
20/06/12 07:09:55 INFO SparkEnv: Registering BlockManagerMaster
20/06/12 07:09:55 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/06/12 07:09:55 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/06/12 07:09:55 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-7405e1be-08e8-4f58-b88e-b8f01f8fe87e
20/06/12 07:09:55 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
20/06/12 07:09:55 INFO SparkEnv: Registering OutputCommitCoordinator
20/06/12 07:09:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
20/06/12 07:09:55 INFO Utils: Successfully started service 'SparkUI' on port 4041.
20/06/12 07:09:56 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.0.2.15:4041
20/06/12 07:09:56 INFO SparkContext: Added JAR file:/Folder/target/scala-2.11/root-2_2.11-0.1.0-SNAPSHOT.jar at spark://10.0.2.15:33031/jars/root-2_2.11-0.1.0-SNAPSHOT.jar with timestamp 1591938596069
20/06/12 07:09:56 INFO Executor: Starting executor ID driver on host localhost
20/06/12 07:09:56 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42815.
20/06/12 07:09:56 INFO NettyBlockTransferService: Server created on 10.0.2.15:42815
20/06/12 07:09:56 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/06/12 07:09:56 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.0.2.15, 42815, None)
20/06/12 07:09:56 INFO BlockManagerMasterEndpoint: Registering block manager 10.0.2.15:42815 with 366.3 MB RAM, BlockManagerId(driver, 10.0.2.15, 42815, None)
20/06/12 07:09:56 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.0.2.15, 42815, None)
20/06/12 07:09:56 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.0.2.15, 42815, None)
Exception in thread "main" java.lang.NoClassDefFoundError: com/mongodb/spark/config/ReadConfig$
at FirstApp$.main(SimpleApp.scala:16)
at FirstApp.main(SimpleApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: com.mongodb.spark.config.ReadConfig$
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 14 more
20/06/12 07:09:56 INFO SparkContext: Invoking stop() from shutdown hook
20/06/12 07:09:56 INFO SparkUI: Stopped Spark web UI at http://10.0.2.15:4041
20/06/12 07:09:56 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/06/12 07:09:56 INFO MemoryStore: MemoryStore cleared
20/06/12 07:09:56 INFO BlockManager: BlockManager stopped
20/06/12 07:09:56 INFO BlockManagerMaster: BlockManagerMaster stopped
20/06/12 07:09:56 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/06/12 07:09:56 INFO SparkContext: Successfully stopped SparkContext
20/06/12 07:09:56 INFO ShutdownHookManager: Shutdown hook called
20/06/12 07:09:56 INFO ShutdownHookManager: Deleting directory /tmp/spark-7f90ac08-403c-4a3f-bb45-ea24a347c380
20/06/12 07:09:56 INFO ShutdownHookManager: Deleting directory /tmp/spark-78cb32aa-c6d1-4ba4-b94f-16d3761d181b

编辑2:

我加了 .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1")SimpleApp.scala但错误仍然与 EDIT1 部分相同:
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession

object FirstApp {
def main(args: Array[String]) {

val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1")
.getOrCreate()
val sc = spark.sparkContext

val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd = MongoSpark.load(sc)
println(customRdd.count())
println(customRdd.first.toJson)

}
}

最佳答案

以下是创建 Scala 项目以使用 Apache spark 从 MongoDB 读取数据的详细步骤

您可以使用 IDE 或手动创建包含以下文件的项目

  • SparkMongo/project/plugins.sbt
  • SparkMongo/src/main/scala/com/test/FirstMongoSparkApp.scala
  • SparkMongo/build.sbt

  • 项目/plugins.sbt
    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")

    build.sbt
    name := "SparkMongo"
    version := "0.1"
    scalaVersion := "2.11.12"

    val sparkVersion = "2.4.1"
    val mongoSparkVersion = "2.4.1"

    libraryDependencies ++= Seq(
    "org.mongodb.spark" %% "mongo-spark-connector" % mongoSparkVersion ,
    "org.apache.spark" %% "spark-core" % sparkVersion,
    "org.apache.spark" %% "spark-sql" % sparkVersion
    )

    assemblyJarName in assembly := s"${name.value}_${scalaBinaryVersion.value}-${version.value}.jar"

    assemblyMergeStrategy in assembly := {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
    }

    SparkMongo/src/main/scala/com/test/FirstMongoSparkApp.scala
    package com.test

    import com.mongodb.spark.MongoSpark
    import com.mongodb.spark.config.ReadConfig
    import org.apache.spark.sql.SparkSession

    object FirstMongoSparkApp extends App {

    val spark = SparkSession.builder()
    .master("local")
    .appName("MongoSparkProject")
    .config("spark.mongodb.input.uri", "mongodb://localhost/test.cities")
    .config("spark.mongodb.output.uri", "mongodb://localhost/test.outputCities")
    .getOrCreate()

    import spark.implicits._

    val readConfig = ReadConfig(Map("collection" -> "cities", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(spark.sparkContext)))
    val customRdd = MongoSpark.load(spark.sparkContext, readConfig)

    customRdd.toDF().show(false)

    }

    现在您可以执行 sbt assembly将生成一个 jar 文件 SparkMongo_2.11-0.1.jar
    您可以将 jar 文件作为
    spark-submit --class "com.test.FirstMongoSparkApp" --master "local" target/scala-2.11/SparkMongo_2.11-0.1.jar

    要顺利运行,请确保您的 spark 版本与依赖项中的版本相同,在这种情况下 2.4.1 和 mongoDB 版本 2.6+

    关于mongodb - 在 Scala 中从 mongoDB 读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62307775/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com