gpt4 book ai didi

hadoop - MapReduce 示例

转载 作者:可可西里 更新时间:2023-11-01 14:19:33 24 4
gpt4 key购买 nike

我正在阅读有关 mapreduce 的文章,我想知道一个特定的场景。假设我们有几个文件(例如 fileA、fileB、fileC),每个文件都包含多个整数。如果我们想对所有文件中的数字进行排序以创建如下内容:

23 fileA
34 fileB
35 fileA
60 fileA
60 fileC

map 和 reduce 过程如何工作?

目前,这是我所拥有的,但不太正确;

  1. (fileName, fileContent) -> (map to) (Number, fileName)

  2. 对临时键值对进行排序并得到(Number, (list of){fileName1, fileName2...})

  3. 减少临时对得到

    (Number, fileName1)
    (Number, fileName2)

    等等

问题是在排序阶段,文件名可能不是按字母顺序排列的,因此 reduce 部分不会生成正确的输出。有人可以就这种情况的正确方法提供一些见解吗?

最佳答案

实现此目的的最佳方法是通过二次排序。您需要对键(在您的案例编号中)和值(在您的案例文件名中)进行排序。在 Hadoop 中,映射器输出仅按键排序。

这可以通过使用复合键来实现:该键是数字和文件名的组合。例如对于第一条记录, key 将是 (23, fileA),而不仅仅是 (23)。

您可以在此处阅读有关二次排序的信息:https://www.safaribooksonline.com/library/view/data-algorithms/9781491906170/ch01.html

您还可以阅读“Hadoop 权威指南”一书中的“二级排序”部分。

为了简单起见,我写了一个程序来实现同样的目的。

在这个程序中,映射器默认对键进行排序。我已经编写了一个逻辑来对 reducer 端的值进行排序。因此它负责对键和值进行排序并生成所需的输出。

程序如下:

package com.myorg.hadooptests;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.*;

public class SortedValue {


public static class SortedValueMapper
extends Mapper<LongWritable, Text , Text, IntWritable>{

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String[] tokens = value.toString().split(" ");

if(tokens.length == 2) {
context.write(new Text(tokens[1]), new IntWritable(Integer.parseInt(tokens[0])));
}
}
}

public static class SortedValueReducer
extends Reducer<Text, IntWritable, IntWritable, Text> {

Map<String, ArrayList<Integer>> valueMap = new HashMap<String, ArrayList<Integer>>();

public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {

String keyStr = key.toString();
ArrayList<Integer> storedValues = valueMap.get(keyStr);

for (IntWritable value : values) {
if (storedValues == null) {
storedValues = new ArrayList<Integer>();
valueMap.put(keyStr, storedValues);
}
storedValues.add(value.get());
}

Collections.sort(storedValues);
for (Integer val : storedValues) {
context.write(new IntWritable(val), key);
}
}
}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf, "CompositeKeyExample");
job.setJarByClass(SortedValue.class);
job.setMapperClass(SortedValueMapper.class);
job.setReducerClass(SortedValueReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path("/in/in1.txt"));
FileOutputFormat.setOutputPath(job, new Path("/out/"));

System.exit(job.waitForCompletion(true) ? 0:1);

}
}

映射器逻辑:

  1. 解析每一行。假定键和值由空白字符 ("") 分隔。
  2. 如果该行包含 2 个标记,它会发出(文件名,整数值)。例如对于第一条记录,它发出 (fileA, 23)。

Reducer 逻辑:

  1. 它将 (key, value) 对放在一个 HashMap 中,其中 key 是文件名,value 是该文件的整数列表。例如对于 fileA,存储的值将是 23、34 和 35

  2. 最后,它对特定键的值和从 reducer 发出的每个值 (value, key) 进行排序。例如对于fileA,记录输出为:(23, fileA), (34, fileA) and (35, fileA)

我为以下输入运行了这个程序:

34 fileB
35 fileA
60 fileC
60 fileA
23 fileA

我得到了以下输出:

23      fileA
35 fileA
60 fileA
34 fileB
60 fileC

关于hadoop - MapReduce 示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34259487/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com