gpt4 book ai didi

java - 使用Java Spark加载现有Mongodb到Hive

转载 作者:行者123 更新时间:2023-12-01 18:02:08 25 4
gpt4 key购买 nike

目标

我正在使用 Spark (2.3.1) 和 Java 将 ETL Mongodb 转换为 Hive

我在哪里 RN

我可以加载现有的 Mongodb 并显示/查询数据

问题

但我在将其保存到配置单元表时遇到问题。

Mongodb数据结构

当前mongodb数据是复杂的嵌套dict(结构体类型),有没有办法更方便地转换成保存在hive中?

public static void main(final String[] args) throws InterruptedException {
// spark session read mongodb
SparkSession mongo_spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("mongo_spark.master", "local")
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/test_db.test_collection")
.config("spark.mongodb.output.uri", "mongodb://localhost:27017/test_db.test_collection")
.enableHiveSupport()
.getOrCreate();

// Create a JavaSparkContext using the SparkSession's SparkContext object
JavaSparkContext jsc = new JavaSparkContext(mongo_spark.sparkContext());

// Load data and infer schema, disregard toDF() name as it returns Dataset
Dataset<Row> implicitDS = MongoSpark.load(jsc).toDF();
implicitDS.printSchema();
implicitDS.show();

// createOrReplaceTempView
implicitDS.createOrReplaceTempView("my_table");
// mongo_spark.sql("DROP TABLE IF EXISTS my_table");
// cannot save table this step
// implicitDS.write().saveAsTable("my_table");
// can query the temp view
mongo_spark.sql("SELECT * FROM my_table limit 1").show();

// More application logic would go here...
JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);
System.out.println(rdd.count());
System.out.println(rdd.first().toJson());

jsc.close();
}

有人有用 Java 进行 ETL Spark 工作的经验吗?我真的很感激。

最佳答案

随着更多的研究,我意识到这是一个广泛的问题。这个问题的准确答案是

public static void main(final String[] args) throws InterruptedException {
// spark session read mongodb
SparkSession mongo_spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("mongo_spark.master", "local")
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/test_db.test_collection")
.config("spark.mongodb.output.uri", "mongodb://localhost:27017/test_db.test_collection")
.enableHiveSupport()
.getOrCreate();

// Create a JavaSparkContext using the SparkSession's SparkContext object
JavaSparkContext jsc = new JavaSparkContext(mongo_spark.sparkContext());

// Load data and infer schema, disregard toDF() name as it returns Dataset
Dataset<Row> implicitDS = MongoSpark.load(jsc).toDF();
implicitDS.printSchema();
implicitDS.show();

// createOrReplaceTempView
implicitDS.createOrReplaceTempView("my_table");
mongo_spark.sql("DROP TABLE IF EXISTS my_table");
implicitDS.write().saveAsTable("my_table");

jsc.close();
}

所以实际上代码是有效的,但阻碍我的是我的数据中发生了一些事情

  1. 单个字段的数据类型冲突(com.mongodb.spark.exceptions.MongoTypeConversionException:无法转换...) - 这可以解决在加载时增加样本大小,检查java语法 How to config Java Spark sparksession samplesize

  2. 嵌套结构中的 nulltype - 我仍在寻找 Java 中的解决方案

由于我获得了 Scala 代码示例,我会尽力记录我的发现,希望有一天它可以节省您的时间

关于java - 使用Java Spark加载现有Mongodb到Hive,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60611653/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com