gpt4 book ai didi

java - Java MapReduce按日期计数

转载 作者:行者123 更新时间:2023-12-02 20:56:34 24 4
gpt4 key购买 nike

我是Hadoop的新手,我正在尝试做一个MapReduce程序,以按日期(按月分组)统计最多两个前导者出现的次数。所以我的输入是这样的:

2017-06-01 , A, B, A, C, B, E, F 
2017-06-02 , Q, B, Q, F, K, E, F
2017-06-03 , A, B, A, R, T, E, E
2017-07-01 , A, B, A, C, B, E, F
2017-07-05 , A, B, A, G, B, G, G

因此,作为这个MapReducer程序的结果,我参与其中:
2017-06,  A:4, E:4
2017-07, A:4, B:4
public class ArrayGiulioTest {

public static Logger logger = Logger.getLogger(ArrayGiulioTest.class);

public static class CustomMap extends Mapper<LongWritable, Text, Text, TextWritable> {
private Text word = new Text();

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

TextWritable array = new TextWritable();
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line, ",");
String dataAttuale = tokenizer.nextToken().substring(0,
line.lastIndexOf("-"));

Text tmp = null;
Text[] tmpArray = new Text[tokenizer.countTokens()];
int i = 0;
while (tokenizer.hasMoreTokens()) {
String prod = tokenizer.nextToken(",");

word.set(dataAttuale);
tmp = new Text(prod);
tmpArray[i] = tmp;

i++;
}

array.set(tmpArray);

context.write(word, array);

}
}

public static class CustomReduce extends Reducer<Text, TextWritable, Text, Text> {


public void reduce(Text key, Iterator<TextWritable> values,
Context context) throws IOException, InterruptedException {

MapWritable map = new MapWritable();
Text txt = new Text();

while (values.hasNext()) {
TextWritable array = values.next();
Text[] tmpArray = (Text[]) array.toArray();
for(Text t : tmpArray) {
if(map.get(t)!= null) {
IntWritable val = (IntWritable) map.get(t);
map.put(t, new IntWritable(val.get()+1));
} else {
map.put(t, new IntWritable(1));
}
}

}

Set<Writable> set = map.keySet();
StringBuffer str = new StringBuffer();
for(Writable k : set) {

str.append("key: " + k.toString() + " value: " + map.get(k) + "**");
}
txt.set(str.toString());


context.write(key, txt);
}
}

public static void main(String[] args) throws Exception {
long inizio = System.currentTimeMillis();
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "countProduct");
job.setJarByClass(ArrayGiulioTest.class);

job.setMapperClass(CustomMap.class);
//job.setCombinerClass(CustomReduce.class);
job.setReducerClass(CustomReduce.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TextWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
long fine = System.currentTimeMillis();
logger.info("**************************************End" + (End-Start));
System.exit(1);
}

}

并且我已经以这种方式实现了我的自定义TextWritable:
public class TextWritable extends ArrayWritable {


public TextWritable() {
super(Text.class);
}
}

..so当我运行MapReduce程序时,我得到了这种结果
2017-6    wordcount.TextWritable@3e960865
2017-6 wordcount.TextWritable@3e960865

很明显,我的 reducer 不起作用。看来我的Mapper的输出

任何的想法?有人可以说解决方案是否正确?

这是控制台日志(仅供引用,我的输入文件有6行而不是5行)
*我在Eclipse(mono JVM)下或将Hadoop与Hdfs一起启动MapReduce问题时获得了相同的结果
File System Counters
FILE: Number of bytes read=1216
FILE: Number of bytes written=431465
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=6
Map output records=6
Map output bytes=214
Map output materialized bytes=232
Input split bytes=97
Combine input records=0
Combine output records=0
Reduce input groups=3
Reduce shuffle bytes=232
Reduce input records=6
Reduce output records=6
Spilled Records=12
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=0
Total committed heap usage (bytes)=394264576
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=208
File Output Format Counters
Bytes Written=1813

最佳答案

我认为您正在尝试在Mapper中进行过多的工作。您只需要对日期进行分组(似乎您并不会根据预期的输出正确格式化日期)。

例如,以下方法将改变这些思路

2017-07-01 , A, B, A, C, B, E, F
2017-07-05 , A, B, A, G, B, G, G

放入这对 reducer
2017-07 , ("A,B,A,C,B,E,F", "A,B,A,G,B,G,G")

换句话说,使用 ArrayWritable并没有任何真正的好处,只需将其保留为文本即可。

因此,Mapper看起来像这样
class CustomMap extends Mapper<LongWritable, Text, Text, Text> {

private final Text key = new Text();
private final Text output = new Text();

@Override
protected void map(LongWritable offset, Text value, Context context) throws IOException, InterruptedException {

int separatorIndex = value.find(",");

final String valueStr = value.toString();
if (separatorIndex < 0) {
System.err.printf("mapper: not enough records for %s", valueStr);
return;
}
String dateKey = valueStr.substring(0, separatorIndex).trim();
String tokens = valueStr.substring(1 + separatorIndex).trim().replaceAll("\\p{Space}", "");

SimpleDateFormat fmtFrom = new SimpleDateFormat("yyyy-MM-dd");
SimpleDateFormat fmtTo = new SimpleDateFormat("yyyy-MM");

try {
dateKey = fmtTo.format(fmtFrom.parse(dateKey));
key.set(dateKey);
} catch (ParseException ex) {
System.err.printf("mapper: invalid key format %s", dateKey);
return;
}

output.set(tokens);
context.write(key, output);
}
}

然后,reducer可以构建一个Map,该Map收集并计算值字符串中的值。同样,只写出文本。
class CustomReduce extends Reducer<Text, Text, Text, Text> {

private final Text output = new Text();

@Override
protected void reduce(Text date, Iterable<Text> values, Context context) throws IOException, InterruptedException {

Map<String, Integer> keyMap = new TreeMap<>();
for (Text v : values) {
String[] keys = v.toString().trim().split(",");

for (String key : keys) {
if (!keyMap.containsKey(key)) {
keyMap.put(key, 0);
}
keyMap.put(key, 1 + keyMap.get(key));
}
}

output.set(mapToString(keyMap));
context.write(date, output);
}

private String mapToString(Map<String, Integer> map) {
StringBuilder sb = new StringBuilder();
String delimiter = ", ";
for (Map.Entry<String, Integer> entry : map.entrySet()) {
sb.append(
String.format("%s:%d", entry.getKey(), entry.getValue())
).append(delimiter);
}
sb.setLength(sb.length()-delimiter.length());
return sb.toString();
}
}

有了您的输入,我明白了
2017-06 A:4, B:4, C:1, E:4, F:3, K:1, Q:2, R:1, T:1
2017-07 A:4, B:4, C:1, E:1, F:1, G:3

关于java - Java MapReduce按日期计数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44399163/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com