gpt4 book ai didi

hadoop - 如何使用同一程序将MapReduce输出插入HBASE

转载 作者:行者123 更新时间:2023-12-02 20:56:04 25 4
gpt4 key购买 nike

我编写了一个程序,该程序以pdf为输入,并整体生成文本输出。我想使用相同的程序在hbase中加载此文本,有没有办法做到这一点。任何帮助都将是可贵的

//Driver Class
package com.tcs;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PdfInputDriver {

public static void main(String[] args) throws IOException,InterruptedException, ClassNotFoundException
{
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
args = parser.getRemainingArgs();
@SuppressWarnings("deprecation")
Job job = new Job(conf, "Pdftext");
job.setJarByClass(PdfInputDriver.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setInputFormatClass(PdfInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);


System.out.println(job.waitForCompletion(true));
}
}

//InputFormatClass
package com.tcs;

import java.io.IOException;

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class PdfInputFormat extends FileInputFormat<Object, Object> {

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public RecordReader createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {

return new PdfRecordReader();
}

}

//PDF Record Reader class
package com.tcs;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.util.PDFTextStripper;

public class PdfRecordReader extends RecordReader<Object, Object> {

private String[] lines = null;
private LongWritable key = null;
private Text value = null;

@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException, InterruptedException {

FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
final Path file = split.getPath();

/*
* The below code contains the logic for opening the file and seek to
* the start of the split. Here we are applying the Pdf Parsing logic
*/

FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
PDDocument pdf = null;
String parsedText = null;
PDFTextStripper stripper;
pdf = PDDocument.load(fileIn);
stripper = new PDFTextStripper();
parsedText = stripper.getText(pdf);
//String delims = "[ ]";
this.lines = parsedText.split("/n");
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {

if (key == null) {
key = new LongWritable();
key.set(1);
value = new Text();
value.set(lines[0]);
} else
{
int temp = (int) key.get();
if (temp < (lines.length - 1)) {
int count = (int) key.get();
value = new Text();
value.set(lines[count]);
count = count + 1;
key = new LongWritable(count);
} else {
return false;
}

}
if (key == null || value == null) {
return false;
} else {
return true;
}
}

@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {

return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {

return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {

return 0;
}

@Override
public void close() throws IOException {

}

}

//Mapper Class
package com.tcs;


import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>
{

protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

context.write(value, key);
}
}


//Reducer Class
package com.tcs;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Object, Object, Object, Object> {
protected void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {


context.write(key, new WordCountReducer());
}
}

最佳答案

我认为您正在制作jar文件。只需使用从mapreduce输出生成的part-r-00000文件即可。
创建“表”

关于hadoop - 如何使用同一程序将MapReduce输出插入HBASE,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44496777/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com