gpt4 book ai didi

mongodb - 将数据从HDFS导出到MongoDB

转载 作者:行者123 更新时间:2023-12-02 20:48:58 25 4
gpt4 key购买 nike

我正在尝试将数据从hdfs移动到mongodb。我可以通过以下命令行来实现。

hadoop fs -text "/user/name.txt" | mongoimport --host 127.0.0.1:27018 -d cds -c hello --type tsv --headerline

我需要为此编写一个scala代码。我在文件系统中有多个文件。我已经检查了mongo-hadoop连接器,但我需要与此相反。从hdfs读取文件并将其转储到scala的mongodb中。

最佳答案

有多种方法可以执行此操作,由于以上操作都是作为单线程应用程序运行的,因此您甚至可以通过引入一个简单的MR(例如complete note here)来运行同一分布式

public class LogReducer extends Reducer<Text, IntWritable, NullWritable, MongoUpdateWritable> {

@Override
public void reduce( final Text pKey,
final Iterable<IntWritable> pValues,
final Context pContext )
throws IOException, InterruptedException{

int count = 0;
for(IntWritable val : pValues){
count += val.get();
}

BasicBSONObject query = new BasicBSONObject("devices", new ObjectId(pKey.toString()));
BasicBSONObject update = new BasicBSONObject("$inc", new BasicBSONObject("logs_count", count));
pContext.write(null, new MongoUpdateWritable(query, update, true, false));
}

}

或按 here所述通过配置单元表,而无需编写太多代码。
还是在这里 Spark
package com.mongodb.spark_examples;

import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.config.WriteConfig;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;

import org.bson.Document;

import static java.util.Arrays.asList;

import java.util.HashMap;
import java.util.Map;


public final class WriteToMongoDBWriteConfig {

public static void main(final String[] args) throws InterruptedException {

SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();

JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

// Create a custom WriteConfig
Map<String, String> writeOverrides = new HashMap<String, String>();
writeOverrides.put("collection", "spark");
writeOverrides.put("writeConcern.w", "majority");
WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides);

// Create a RDD of 10 documents
JavaRDD<Document> sparkDocuments = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map
(new Function<Integer, Document>() {
public Document call(final Integer i) throws Exception {
return Document.parse("{spark: " + i + "}");
}
});

/*Start Example: Save data from RDD to MongoDB*****************/
MongoSpark.save(sparkDocuments, writeConfig);
/*End Example**************************************************/

jsc.close();

}

}

斯卡拉
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://192.168.2.13:28111,192.168.2.14:28112,192.168.2.15:28113/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://192.168.2.13:28111,192.168.2.14:28112,192.168.2.15:28113/test.myCollection")
.getOrCreate()
import org.apache.spark.sql.SparkSession

val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" -> "majority"), Some(WriteConfig(spark.sparkContext)))
val sparkDocuments = spark.sparkContext.parallelize((1 to 10).map(i => Document.parse(s"{spark: $i}")))

MongoSpark.save(sparkDocuments, writeConfig)

}

关于mongodb - 将数据从HDFS导出到MongoDB,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46770444/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com