gpt4 book ai didi

mongodb - 如何在mongoDB中使用mapReduce存储来自hdfs的处理数据

转载 作者:行者123 更新时间:2023-12-02 21:35:38 25 4
gpt4 key购买 nike

我有一个mapreduce应用程序,该应用程序处理了HDFS中的数据并将输出数据存储在HDFS中

但是,现在我需要将输出数据存储在mongodb中,以将其存储到HDFS中

谁能让我知道该怎么做?

谢谢

映射类

package com.mapReduce;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FMapper extends Mapper<LongWritable, Text, Text, Text> {
private String pART;
private String actual;
private String fdate;
public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
String tempString = ivalue.toString();
String[] data = tempString.split(",");
pART=data[1];
try{
fdate=convertyymmdd(data[0]);
/**IF ACTUAL IS LAST HEADER
* actual=data[2];
* */
actual=data[data.length-1];
context.write(new Text(pART), new Text(fdate+","+actual+","+dynamicVariables(data)));
}catch(ArrayIndexOutOfBoundsException ae){
System.err.println(ae.getMessage());
}

}


public static String convertyymmdd(String date){

String dateInString=null;
String data[] =date.split("/");
String month=data[0];
String day=data[1];
String year=data[2];
dateInString =year+"/"+month+"/"+day;
System.out.println(dateInString);
return dateInString;
}

public static String dynamicVariables(String[] data){
StringBuilder str=new StringBuilder();
boolean isfirst=true;
/** IF ACTUAL IS LAST HEADER
* for(int i=3;i<data.length;i++){ */
for(int i=2;i<data.length-1;i++){

if(isfirst){
str.append(data[i]);
isfirst=false;
}
else
str.append(","+data[i]);
}
return str.toString();
}

}

reducer 类
package com.mapReduce;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import javax.faces.bean.ApplicationScoped;
import javax.faces.bean.ManagedBean;
import javax.faces.bean.ManagedProperty;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import com.ihub.bo.ForcastBO;
import com.ihub.service.ForcastService;
public class FReducer extends Reducer<Text, Text, Text, Text> {
private String pART;
private List<ForcastBO> list = null;
private List<List<String>> listOfList = null;
private List<String> vals = null;
private static List<ForcastBO> forcastBos=new ArrayList<ForcastBO>();

@Override
public void reduce(Text _key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
pART = _key.toString();
// process values
for (Text val : values) {
String tempString = val.toString();
String[] data = tempString.split(",");
ForcastBO fb=new ForcastBO();
fb.setPart(pART);
fb.setDate(data[0]);
fb.setActual(data[1]);
fb.setW0(data[2]);
fb.setW1(data[3]);
fb.setW2(data[4]);
fb.setW3(data[5]);
fb.setW4(data[6]);
fb.setW5(data[7]);
fb.setW6(data[8]);
fb.setW7(data[9]);
try {
list.add(fb);
} catch (Exception ae) {
System.out.println(ae.getStackTrace() + "****" + ae.getMessage() + "*****" + ae.getLocalizedMessage());
}
}
}

@Override
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {

listOfList = new ArrayList<List<String>>();
list=new ArrayList<ForcastBO>();
reduce(context.getCurrentKey(), context.getValues(), context);
files_WE(listOfList, list, context);

}

}finally {
cleanup(context);
}
}


public void files_WE(List<List<String>> listOfList, List<ForcastBO> list, Context context) {

Collections.sort(list);

try {
setData(listOfList, list);

Collections.sort(listOfList, new Comparator<List<String>>() {
@Override
public int compare(List<String> o1, List<String> o2) {
return o1.get(0).compareTo(o2.get(0));
}
});

for (int i = listOfList.size() - 1; i > -1; i--) {
List<String> list1 = listOfList.get(i);
int k = 1;
for (int j = 3; j < list1.size(); j++) {
try {
list1.set(j, listOfList.get(i - k).get(j));
} catch (Exception ex) {
list1.set(j, null);
}
k++;
}

}
} catch (Exception e) {
//e.getLocalizedMessage();
}

for(List<String> ls:listOfList){
System.out.println(ls.get(0));
ForcastBO forcastBO=new ForcastBO();
try{
forcastBO.setPart(ls.get(0));
forcastBO.setDate(ls.get(1));
forcastBO.setActual(ls.get(2));
forcastBO.setW0(ls.get(3));
forcastBO.setW1(ls.get(4));
forcastBO.setW2(ls.get(5));
forcastBO.setW3(ls.get(6));
forcastBO.setW4(ls.get(7));
forcastBO.setW5(ls.get(8));
forcastBO.setW6(ls.get(9));
forcastBO.setW7(ls.get(10));
forcastBos.add(forcastBO);
}catch(Exception e){
forcastBos.add(forcastBO);
}
try{
System.out.println(forcastBO);
//service.setForcastBOs(forcastBos);
}catch(Exception e){
System.out.println("FB::::"+e.getStackTrace());
}
}
}





public void setData(List<List<String>> listOfList, List<ForcastBO> list) {
List<List<String>> temListOfList=new ArrayList<List<String>>();
for (ForcastBO str : list) {
vals = new ArrayList<String>();
vals.add(str.getPart());
vals.add(str.getDate());
vals.add(str.getActual());
vals.add(str.getW0());
vals.add(str.getW1());
vals.add(str.getW2());
vals.add(str.getW3());
vals.add(str.getW4());
vals.add(str.getW5());
vals.add(str.getW6());
vals.add(str.getW7());
temListOfList.add(vals);
}


Collections.sort(temListOfList, new Comparator<List<String>>() {
@Override
public int compare(List<String> o1, List<String> o2) {
return o1.get(1).compareTo(o2.get(1));
}
});

for(List<String> ls:temListOfList){
System.out.println(ls);
listOfList.add(ls);
}
}

public static List<ForcastBO> getForcastBos() {
return forcastBos;
}



}

驾驶舱
package com.mapReduce;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class MRDriver {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(MRDriver.class);
// TODO: specify a mapper
job.setMapperClass(FMapper.class);
// TODO: specify a reducer
job.setReducerClass(FReducer.class);

// TODO: specify output types
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

// TODO: delete temp file
FileSystem hdfs = FileSystem.get(new URI("hdfs://localhost:9000"),
conf);
Path workingDir=hdfs.getWorkingDirectory();

Path newFolderPath= new Path("/sd1");
newFolderPath=Path.mergePaths(workingDir, newFolderPath);
if(hdfs.exists(newFolderPath))

{
hdfs.delete(newFolderPath); //Delete existing Directory

}
// TODO: specify input and output DIRECTORIES (not files)

FileInputFormat.setInputPaths(job,new Path("hdfs://localhost:9000/Forcast/SampleData"));
FileOutputFormat.setOutputPath(job, newFolderPath);

if (!job.waitForCompletion(true))
return;
}
}

最佳答案

基本上,您需要更改“输出格式类”,并且这里有几种方法:

  • 对Hadoop使用 MongoDB连接器:http://docs.mongodb.org/ecosystem/tools/hadoop/?_ga=1.111209414.370990604.1441913822
  • 实现自己的 OutputFormat :https://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapred/OutputFormat.html(代替使用FileOutputFormat)。
  • 在化简器中执行 mongodb查询,而不是在MapREduce上下文中编写(不好,根据驱动程序中指定的OutputFormat,您可以在HDFS中以空输出文件结尾)

    在我看来,选项1是最好的选择,但是我没有使用MongoDB连接器来说明它是否足够稳定和功能。选项2要求您真正了解hadoop底层的工作方式,以避免结束许多开放的连接以及事务和hadoop任务重试的问题。

  • 关于mongodb - 如何在mongoDB中使用mapReduce存储来自hdfs的处理数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32500080/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com