gpt4 book ai didi

java - Hadoop MapReduce 作业创建太大的中间文件

转载 作者:行者123 更新时间:2023-12-01 21:56:44 25 4
gpt4 key购买 nike

我编写了一个mapreduce程序,但是当我尝试在hadoop上运行时它无法成功,因为它生成了大量的中间数据,我收到一条错误消息:节点上没有更多空间。之后它尝试第二个节点,但结果是相同的。我想处理两个文本文件:大约 6 万行。

我尝试过:- 启用快速压缩,但没有帮助。- 添加更多空间,使两个节点有50-50GB存储

由于他们都没有得到帮助,也许问题出在代码上,而不是设置上。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class FirstMapper extends Mapper<LongWritable, Text, Text, Text> {

enum POS_TAG {
CC, CD, DT, EX,
FW, IN, JJ, JJR,
JJS, LS, MD, NN,
NNS, NNP, NNPS, PDT,
WDT, WP, POS, PRP,
PRP$, RB, RBR, RBS,
RP, SYM, TO, UH,
VB, VBD, VBG, VBN,
VBP, VBZ, WP$, WRB
}

private static final List<String> tags = Stream.of(POS_TAG.values())
.map(Enum::name)
.collect(Collectors.toList());
private static final int MAX_NGRAM = 5;
private static String[][] cands = {
new String[3],
new String[10],
new String[32],
new String[10]
};

@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();

String location = conf.get("job.cands.path");

if (location != null) {
BufferedReader br = null;
try {
FileSystem fs = FileSystem.get(conf);
Path path = new Path(location);

if (fs.exists(path)) {
FSDataInputStream fis = fs.open(path);
br = new BufferedReader(new InputStreamReader(fis));

String line;
int i = 0;
while ((line = br.readLine()) != null) {
String[] splitted = line.split(" ");
cands[i] = splitted;
i++;
}

}
} catch (IOException e) {
//
} finally {
br.close();
}
}
}

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split(" ");
int m = tokens.length;

for (int n = 2; n <= MAX_NGRAM; n++) {
for (int s = 0; s <= m - n; s++) {
for (int i = 0; i < cands[n - 2].length; i++) {
List<String> pattern = new ArrayList<>();
List<String> metWords = new ArrayList<>();

for (int j = 0; j <= n - 1; j++) {
String[] pair = tokens[s + j].split("/");
String word = pair[0];
String pos = pair[1];

char c = cands[n - 2][i].charAt(j);
addToPattern(word, pos, c, pattern);
if (c > 0 && tags.contains(pos)) {
metWords.add(word);
}
}
if (metWords.isEmpty()) {
metWords.add("_NONE");
}

Text resultKey = new Text(pattern.toString() + ";" + metWords.toString());
context.write(resultKey, new Text(key.toString()));
}
}
}


}

public void addToPattern(String word, String pos, char c, List<String> pattern) {
switch (c) {
case 'w':
pattern.add(word);
break;
case 'p':
pattern.add(pos);
break;
default:
pattern.add("_WC_");
break;
}
}
}



public class Main {


public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
conf.set("job.cands.path", "/user/thelfter/pwp");

Job job1 = Job.getInstance(conf, "word pattern1");
job1.setJarByClass(Main.class);
job1.setMapperClass(FirstMapper.class);
job1.setCombinerClass(FirstReducer.class);
job1.setReducerClass(FirstReducer.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path("/user/thelfter/output"));
System.exit(job1.waitForCompletion(true) ? 0 : 1);
}
}

最佳答案

如果您使用 YARN,则节点管理器的磁盘空间由 yarn-site.xml 文件中的 yarn.nodemanager.local-dirs 控制,因此无论它是什么指向需要有足够的磁盘空间。

关于java - Hadoop MapReduce 作业创建太大的中间文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58738035/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com