gpt4 book ai didi

apache-spark - 查找特定节点的连接组件而不是整个图(GraphFrame/GraphX)

转载 作者:行者123 更新时间:2023-12-03 17:40:40 24 4
gpt4 key购买 nike

我在 Spark 中创建了一个 GraphFrame,该图目前如下所示:



基本上,会有很多这样的子图,其中每个子图都将彼此断开。给定一个特定的节点 ID,我想在子图中找到所有其他节点。例如,如果给定节点 ID 1,则图将遍历并返回 2,10,20,3,30。

我创建了一个主题,但它没有给出正确的结果。

testgraph.find("(a)-[]->(b); (c)-[]->(b)").filter("(a.id = '1')").show()

不幸的是,连通分量函数考虑了整个图。是否可以使用 在给定特定节点 ID 的情况下获取断开连接的子图中的所有节点? GraphFrame/GraphX ?

最佳答案

获取与特定顶点相关的连接组件可以使用 BFS 遍历来完成,该遍历从该顶点开始并在多个跃点上收集其所有邻居。
这可以通过 GraphX 提供的 Pregel API 简单地完成,我们应该在其中实现 vertexProgram、sendMessage 和 mergeMes​​sages 函数。该算法在接收到初始消息时触发。中心向其邻居发送消息,该消息将传播给他们的邻居,依此类推,直到覆盖连接的组件。每个接收到 msg 的顶点都会被检查,这样它就不会在接下来的迭代中被激活。
下面是这个方法的实现:

import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}

object ConnectedComponent extends Serializable {

def main(args = Array[String]) = {

val conf = new SparkConf().setAppName("ConnectedComponent").setMaster("local")
val sc = new SparkContext(conf)
val vRDD = sc.objectFile[(VertexId,Int)]("/path/to/vertex/rdd/file/")
val eRDD = sc.objectFile[Edge[Int]]("/path/to/edge/rdd/file/")
val graph = Graph(vRDD, eRDD)
val centerOfCC = graph.pickRandomVertex()
var cc = extractCC(graph, center)
cc.vertices.collect.foreach(println)

sc.stop()
}

def extractCC(g: Graph[Int, Int], center: VertexId): Graph[Int, Int] = {
/* Return a subgraph of the input graph containing 'center' with the connected component
*/
val initialGraph = g.mapVertices((id, attr) => VertexData(attr, false, false, center))
val connectedComponent = initialGraph.pregel(initialMsg = 0)(vprog, sendMsg, mergeMsgs)
.subgraph(vpred = (id, attr) => attr.checked == true)
.mapVertices((id, vdata) => vdata.attr)
connectedComponent
}


case class VertexData( var attr : Int, // label of the vertex
var checked : Boolean, // check visited vertices
var propagate : Boolean, // allow forwarding msgs or not
var center: VertexId) // ID of the connectedComponent center
def vprog(id:VertexId, vdata: VertexData, msg: Int): VertexData = {

val attr : Int = vdata.attr
var checked : Boolean = vdata.checked
var propagate : Boolean = vdata.propagate
val center : VertexId = vdata.center

if (checked==false && msg == 0 && id==center) {
propagate = true
checked = true
}
else if(checked==false && msg == 1) {
propagate = true
checked = true
}
else if(checked == true && msg == 1){
propagate = false
}
new VertexData(attr, checked, propagate, center)
}

def sendMsg(triplet: EdgeTriplet[VertexData, Int]):Iterator[(VertexId, Int)] = {
var it : Iterator[(VertexId, Int)] = Iterator()
if(triplet.dstAttr.propagate==true)
it = it ++ Iterator((triplet.srcId, 1))
if(triplet.srcAttr.propagate==true)
it = it ++ Iterator((triplet.dstId, 1))
it
}

def mergeMsgs(a: Int, b: Int): Int = math.max(a, b)
}

关于apache-spark - 查找特定节点的连接组件而不是整个图(GraphFrame/GraphX),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37464068/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com