gpt4 book ai didi

java - Hadoop Map Reduce For Google web graph

转载 作者:可可西里 更新时间:2023-11-01 14:16:16 24 4
gpt4 key购买 nike

我们的任务是创建 map reduce 函数,该函数将为 google 网络图中的每个节点 n 输出,列出您可以在 3 跳中从节点 n 到达的节点。 (实际数据可以在这里找到:http://snap.stanford.edu/data/web-Google.html)
以下是列表中项目的示例:

1 2 
1 3
2 4
3 4
3 5
4 1
4 5
4 6
5 6

从上面的示例图将是这个
Graph example

在上面的简化示例中,例如节点 1 的路径是
α [1 -> 2 -> 4 -> 1], [1 -> 2 -> 4 -> 5], [1 -> 2 -> 4 -> 6], [1 -> 3 -> 4 -> 1],
[1 -> 3 -> 4 -> 5], [1 -> 3 -> 4 -> 6] και [1 -> 3 -> 5 -> 6]
因此 map reduce 将为节点 1 输出顶点 1,5,6 (
(a) 每个顶点只能计数一次,并且
(b) 我们仅在存在长度为 3 的圆形路径时才包含当前顶点,例如 [1 -> 2 -> 4 -> 1] 和 [1 -> 3 -> 4 -> 1]。

我很迷茫,因为我相信这需要图论和算法的知识,而我们没有被教过任何与此相关的知识。

如果有人能给我一个正确的开始方向,我将不胜感激。 (我已经研究了最短路径理论等,但我不确定它是否对这个特定的练习有用)

提前致谢,祝您假期愉快。

编辑

我尝试创建邻接列表,但是虽然我希望输出为“vertexID”“node1 node2 node3 node4...”,但我看到在输出文件中我的 reducer 将每个顶点 Id 的列表分成三对。

例如,如果我有连接到 Z、X、C、V、B、N、M、G、H、J、K、L 的顶点 A,它输出为

A Z,X,C

A V, B, N

A M, G, H

A J,K,L

下面是我的映射器和 reducer
public class AdjacentsListDriver extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {



Configuration conf = getConf();
Job job = Job.getInstance(conf);
job.setJobName("Test driver");
job.setJarByClass(AdjacentsListDriver.class);

String[] arg0 = new GenericOptionsParser(conf, args).getRemainingArgs();
if (arg0.length != 2) {
System.err.println("Usage: hadoop jar <my_jar> <input_dir> <output_dir>");
System.exit(1);
}

Path in = new Path(arg0[0]);
Path out = new Path(arg0[1]);

FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);

job.setMapperClass(ListMapper.class);
job.setReducerClass(ListReducer.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);

return 0;
}

public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new AdjacentsListDriver(), args);
System.exit(res);

}



}





/**
* @author George
* Theoretically this takes a key(vertexID) and maps all nodes that are connected to it in one hop....
*
*/
public class ListMapper extends Mapper<LongWritable, Text, Text, Text> {

private Text vertexID = new Text();
//private LongWritable vertice= new LongWritable(0);
private Text vertice=new Text();

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();
StringTokenizer itr = new StringTokenizer(line,"\n");
StringTokenizer itrInside;

//vertice=new LongWritable(Long.valueOf(value.toString()).longValue());


while (itr.hasMoreTokens()) {
if(itr.countTokens() > 2){

}//ignore first line ??
else{
itrInside=new StringTokenizer(itr.toString());
vertexID.set(itr.nextToken());

while(itrInside.hasMoreTokens()){
vertice.set(itrInside.nextToken());
context.write(vertexID, value);
}
}
}

}

}

@override
public class ListReducer extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

String vertices="";

for (Text value : values) {
if(vertices=="")
vertices+=value.toString();
else
vertices=vertices+","+value.toString();
}

Text value=new Text();
value.set(vertices);
context.write(key, value);

}

}

最佳答案

由于这是您的(家庭作业)作业,因此我不会包括 Java/Hadoop 解决方案,但我会尝试使 MapReduce 的图形计算概念对您更清晰一些,以便您可以自己实现它。

对于每个顶点,您想要 的顶点正好 n 跳远。在查看最短路径算法时,您走在正确的路径上,但通过简单的广度优先搜索可以更轻松地解决该问题。

然而,当使用 MapReduce 处理图形时,您需要更深入地研究顶点之间的消息传递。图算法通常用多个作业表示,其中 map 和 reduce 阶段具有以下分配:

  • Mapper:向另一个顶点发送消息(通常针对顶点的每个邻居)
  • Reducer:对传入的消息进行分组,加入核心图并减少它们,有时会发送更多消息。

  • 每个作业将始终对前一个作业的输出进行操作,直到您达到结果或放弃为止。

    数据准备

    在您真正想要运行图形算法之前,请确保您的数据采用邻接表的形式。这将使以下迭代算法更容易实现。

    因此,您需要按顶点 id 对它们进行分组,而不是您的邻接元组。下面是一些伪代码:
    map input tuple (X, Y): 
    emit (X, Y)

    reduce input (X, Y[]) :
    emit (X, Y[])

    基本上,您只是按顶点 id 分组,因此您的输入数据是其邻居的键(顶点 id)(可以从该特定键顶点 id 到达的顶点 id 列表)。如果你想节省资源,你可以使用reducer作为combiner。

    算法

    就像我已经提到的,你只需要一个广度优先搜索算法。您将对图中的每个顶点执行广度优先搜索算法,当遇到邻居时,您只需增加一个跳跃计数器,告诉我们距离起始顶点有多远(这是最短路径算法的最简单情况,即当边权重为 1) 时。

    让我给你看一张简单的图片,用一个简单的图表来描述它。橙色表示已访问,蓝色未访问,绿色是我们的结果。括号中是跳数计数器。

    simple graph hopcounting

    你看,在每次迭代中,我们都为每个顶点设置了一个跳跃计数器。如果我们碰到一个新的顶点,我们只会把它加一。如果我们命中第 n 个顶点,我们会以某种方式标记它以供以后查找。

    使用 MapReduce 进行分发

    虽然对每个顶点运行广度优先搜索似乎很浪费,但我们可以通过并行化它做得更好。这里是传递发挥作用的消息。就像上图一样,我们在映射步骤中得到的每个顶点最初都会向其邻居发送一条消息,其中包含以下有效载荷:
    HopMessage: Origin (VertexID) | HopCounter(Integer)

    在第一次迭代中,我们将尝试向邻居发送消息以启动计算。否则,我们将只代理图形或传入消息。

    因此,在您准备好数据后的第一份工作中,您的 map 和 reduce 看起来像这样:
    map input (VertexID key, either HopMessage or List<VertexID> adjacents):
    if(iterationNumber == 1): // only in the first iteration to kick off
    for neighbour in adjacents:
    emit (neighbour, new HopMessage(key, 0))
    emit (key, adjacents or HopMessage) // for joining in the reducer

    reducer 现在在图形和消息之间进行简单的连接,主要是为了获取顶点的邻居,从而导致该输入(以我的简单图形为例):
    1 2 // graph 
    2 1 // hop message
    2 3 // graph
    3 1 // hop message
    3 4 // graph
    4 1 // hop message
    4 - // graph

    在 reducer 步骤中,我们将再次将消息转发给邻居,并在递增后检查跃点计数器是否已经达到 3。
    reducer input(VertexID key, List<either HopMessage or List<VertexID> neighbours> values):
    for hopMessage in values:
    hopMessage.counter += 1
    if (hopMessage.counter == 3)
    emit to some external resource (HopMessage.origin, key)
    else
    for neighbour in neighbours of key:
    emit (neighbour, hopMessage)
    emit (key, neighbours)

    如您所见,这里可能会变得非常困惑:您需要管理两种不同类型的消息,然后还要写入一些外部资源,这些资源将跟踪正好相距 3 跳的顶点。

    只要有要发送的 HopMessages,您就可以安排迭代的作业。这很容易出现图中的循环问题,因为在这种情况下您将无限增加 hopcounter。所以我建议要么用每条消息发送到目前为止的完整遍历路径(非常浪费),要么简单地限制迭代次数。在 n=3 的情况下,不需要 3 次以上的作业迭代。

    有很多博客和项目可以为您提供有关如何处理 Hadoop 中的每个问题的示例。至少我在我的博客中写过 MapReduce 中的图形处理,你可以在我的 github 上找到一些例子。

    清洗输出数据

    最后,您将拥有一堆包含顶点 -> 顶点映射的文件。您可以按照在准备中所做的相同方式减少它们。

    使用 Pregel 的更好方法

    处理图的一种不太麻烦的方法是使用 Pregel 方式来表达图计算。 Pregel 正在使用以顶点为中心的模型,并且可以更轻松地表达这种广度优先计算。

    以下是使用 Apache Hama 的上述算法的简单示例:
      public static class NthHopVertex extends Vertex<Text, NullWritable, HopMessage> {

    @Override
    public void compute(Iterable<HopMessage> messages) throws IOException {
    if (getSuperstepCount() == 0L) {
    HopMessage msg = new HopMessage();
    msg.origin = getVertexID().toString();
    msg.hopCounter = 0;
    sendMessageToNeighbors(msg);
    } else {
    for (HopMessage message : messages) {
    message.hopCounter += 1;
    if (message.hopCounter == 3) {
    getPeer().write(new Text(message.origin), getVertexID());
    voteToHalt();
    } else {
    sendMessageToNeighbors(message);
    }

    }
    }
    }
    }

    顺便说一句,在您的示例中创建的新图如下所示:
    1=[1, 5, 6]
    2=[2, 3, 6]
    3=[2, 3, 6]
    4=[4, 5]

    这是完整的 Hama Graph 实现:

    https://github.com/thomasjungblut/tjungblut-graph/blob/master/src/de/jungblut/graph/bsp/NthHop.java

    关于java - Hadoop Map Reduce For Google web graph,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20774253/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com