gpt4 book ai didi

hadoop - Hadoop处理bz2上的Wikipedia转储文件时内存不足

转载 作者:行者123 更新时间:2023-12-02 21:01:59 25 4
gpt4 key购买 nike

我试图对Wikipedia转储文件执行 map 缩小过程,我读到hadoop确实可以解压缩文件并将其拆分以在映射器上进行处理。

尽管如此,该过程仍未完成,并且日志显示内存不足错误。

我已经读过一个项目https://github.com/whym/wikihadoop/wiki,该项目提供了一个称为StreamWikiDumpInputFormat的InputFormat,但是我不能立即使用它,因为我的映射器和化简器是为Hadoop 2.7实现的。

有人能帮我吗?

编辑

我的工作类是这个

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import fiberClusterer.hadoop.fs.io.WholeFileInputFormat;
import uniandes.mapRed.WCMapper;
import uniandes.mapRed.WCReducer;

public class WordCounter {
public static void main(String[] args) {
if (args.length < 2) {
System.exit(-1);
}
String entrada = args[0];
String salida = args[1];
try {
ejecutarJob(entrada, salida);
} catch (Exception e) {
e.printStackTrace();
}

}

public static void ejecutarJob(String entrada, String salida)
throws IOException, ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();
Job wcJob = Job.getInstance(conf, "WordCounter Job");
wcJob.setJarByClass(WordCounter.class);

wcJob.setMapperClass(WCMapper.class);

wcJob.setMapOutputKeyClass(Text.class);
wcJob.setMapOutputValueClass(Text.class);
wcJob.setReducerClass(WCReducer.class);
wcJob.setOutputKeyClass(Text.class);
wcJob.setOutputValueClass(Text.class);
org.apache.hadoop.mapreduce.lib.input.TextInputFormat
WholeFileInputFormat.setInputPaths(wcJob, new Path(entrada));
wcJob.setInputFormatClass(WholeFileInputFormat.class);

TextOutputFormat.setOutputPath(wcJob, new Path(salida));
wcJob.setOutputFormatClass(TextOutputFormat.class);
wcJob.waitForCompletion(true);
System.out.println(wcJob.toString());
}
}

我的映射器非常简单:
import java.io.IOException;
import java.util.Date;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WCMapper extends Mapper<Text, Text, Text, Text> {
Log log = LogFactory.getLog(WCMapper.class);

@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {

String lines[] = value.toString().split("\\r?\\n");

log.info("line");
for (String line : lines) {
log.info("line");
if (line.contains("name")) {
context.write(new Text((new Date()).toString()), new Text(line));
}
}
}
}

也是我的 reducer
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WCReducer extends Reducer<Text, Text, Text, Text> {

@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

for (Text iw : values) {
context.write(new Text(""), new Text(iw));
}
}
}

这是我用 yarn 检查日志时的输出:
2017-03-26 12:37:07,266 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at fiberClusterer.hadoop.fs.io.MyWholeFileReader.nextKeyValue(MyWholeFileReader.java:104)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

最佳答案

I do not know why you are setting all those WholeFileFormats in your code. Hadoop Mapreduce framework takes care of the compressed files, If splittable compression like bz2 and lzo too. The thing you need to make sure is file extensions has to be proper. In this case bz2 or bzip2. Following code works with bz2 file extension.

JobDriver


Arg:
test.bz2 output


package tryout.mapred;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
* @author ramesh.b
*/
public class JobDriver extends Configured implements Tool {


public static void main(String[] args) throws Exception {

long start = System.currentTimeMillis();
int res = ToolRunner.run(new Configuration(), new JobDriver(), args);
long end = System.currentTimeMillis();

System.out.println("Time spent in millis " + (end - start));
System.exit(res);
}


@SuppressWarnings("deprecation")
@Override
public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

try {
String inputPath = args[0];
Path outputPath = new Path(args[1]);

Configuration conf = getConf();

Job job = new Job(conf);
job.setJarByClass(JobDriver.class);
job.setJobName("Simple.0.0");
job.setReducerClass(SimpleReducer.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.setMapperClass(SimpleMapper.class);
job.setInputFormatClass(TextInputFormat.class);

FileSystem outfs = outputPath.getFileSystem(conf);

if (outfs.exists(outputPath)) {
outfs.delete(outputPath, true);
log.info("deleted " + outputPath);
}

FileInputFormat.addInputPaths(job, inputPath);

LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, outputPath);

return job.waitForCompletion(true) ? 0 : 1;

} catch (Exception e) {
e.printStackTrace();
return 1;
}
}
}

SimpleMapper.java


package tryout.mapred;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SimpleMapper extends Mapper<LongWritable, Text, LongWritable, Text> {

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString().trim();
if (line.contains("xylophone"))
context.write(key, value);
}

}

SimpleReducer


package tryout.mapred;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Map;


public class SimpleReducer extends Reducer<LongWritable, Text, NullWritable, Text> {

@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text line : values) {
context.write(NullWritable.get(), line);
}
}
}

关于hadoop - Hadoop处理bz2上的Wikipedia转储文件时内存不足,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43024409/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com