gpt4 book ai didi

scala - 在 Spark GraphX 中实现拓扑排序

转载 作者:行者123 更新时间:2023-12-04 04:40:30 25 4
gpt4 key购买 nike

我正在尝试实现 topological sort使用 Spark's GraphX图书馆。

这是我到目前为止编写的代码:

MyObject.scala

import java.util.ArrayList

import scala.collection.mutable.Queue

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.EdgeDirection
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.Graph.graphToGraphOps
import org.apache.spark.graphx.VertexId
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

object MyObject {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Spark-App").setMaster("local[2]")
val sc = new SparkContext(conf)

val resources: RDD[Resource] = makeResources(sc)
val relations: RDD[Relation] = makeRelations(sc)

println("Building graph ...")
var graph = buildGraph(resources, relations, sc)
println("Graph built!!")

println("Testing topo sort ...")
val topoSortResult = topoSort(graph, sc);
println("topoSortResult = " + topoSortResult)
println("Testing topo sort done!")
}

def buildGraph(resources: RDD[Resource], relations: RDD[Relation], sc: SparkContext): Graph[Resource, Relation] =
{
val vertices: RDD[(Long, Resource)] = resources.map(resource => (resource.id, resource))
val edges: RDD[Edge[Relation]] = relations.map(relation => Edge(relation.srcId, relation.dstId, relation))
var graph = Graph[Resource, Relation](vertices, edges)
graph
}

def makeResources(sc: SparkContext): RDD[Resource] =
{
var list: List[Resource] = List()
list = list :+ new Resource(1L)
list = list :+ new Resource(2L)
list = list :+ new Resource(3L)
list = list :+ new Resource(4L)
list = list :+ new Resource(5L)
sc.parallelize(list)
}

def makeRelations(sc: SparkContext): RDD[Relation] =
{
var list: List[Relation] = List()
list = list :+ new Relation(1L, "depends_on", 2L)
list = list :+ new Relation(3L, "depends_on", 2L)
list = list :+ new Relation(4L, "depends_on", 2L)
list = list :+ new Relation(5L, "depends_on", 2L)
sc.parallelize(list)

}

def topoSort(graph: Graph[Resource, Relation], sc: SparkContext): java.util.List[(VertexId, Resource)] =
{
// Will contain the result
val sortedResources: java.util.List[(VertexId, Resource)] = new ArrayList()

// Contains all the vertices
val vertices = graph.vertices

// Contains all the vertices whose in-degree > 0
val inDegrees = graph.inDegrees;
val inDegreesKeys_array = inDegrees.keys.collect();

// Contains all the vertices whose in-degree == 0
val inDegreeZeroList = vertices.filter(vertex => !inDegreesKeys_array.contains(vertex._1))

// A map of vertexID vs its in-degree
val inDegreeMapRDD = inDegreeZeroList.map(vertex => (vertex._1, 0)).union(inDegrees);

// Insert all the resources whose in-degree == 0 into a queue
val queue = new Queue[(VertexId, Resource)]
for (vertex <- inDegreeZeroList.toLocalIterator) { queue.enqueue(vertex) }

// Get an RDD containing the outgoing edges of every vertex
val neighbours = graph.collectNeighbors(EdgeDirection.Out)

// Initiate the algorithm
while (!queue.isEmpty) {
val vertex_top = queue.dequeue()
// Add the topmost element of the queue to the result
sortedResources.add(vertex_top)

// Get the neigbours (from outgoing edges) of this vertex
// This will be an RDD containing just 1 element which will be an array of neighbour vertices
val vertex_neighbours = neighbours.filter(vertex => vertex._1.equals(vertex_top._1))

// For each vertex, decrease its in-degree by 1
vertex_neighbours.foreach(arr => {
val neighbour_array = arr._2
neighbour_array.foreach(vertex => {
val oldInDegree = inDegreeMapRDD.filter(vertex_iter => (vertex_iter._1 == vertex._1)).first()._2
val newInDegree = oldInDegree - 1
// Reflect the new in-degree in the in-degree map RDD
inDegreeMapRDD.map(vertex_iter => {
if (vertex_iter._1 == vertex._1) {
(vertex._1, newInDegree)
}
else{
vertex_iter
}
});
// Add this vertex to the result if its in-degree has become zero
if (newInDegree == 0) {
queue.enqueue(vertex)
}
})
})
}

return sortedResources
}

}

资源.scala
class Resource(val id: Long) extends Serializable {
override def toString(): String = {
"id = " + id
}
}

Relation.scala
class Relation(val srcId: Long, val name: String, val dstId: Long) extends Serializable {
override def toString(): String = {
srcId + " " + name + " " + dstId
}
}

我收到错误:
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

线路 val oldInDegree = inDegreeMapRDD.filter(vertex_iter => (vertex_iter._1 == vertex._1)).first()._2 .

我想这是因为在其他一些 RDD 的 for-each 循环中修改 RDD 是非法的。

另外,我担心 queue.enqueue(vertex)不会工作,因为 it is not possible to modify a local collection inside a for-each loop .

如何正确实现这种拓扑排序算法?

异常的完整堆栈跟踪已上传 here (必须从外部上传以防止超出 StackOverflow 的主体尺寸限制)。

最佳答案

vertex_neighbours.foreach(arr => {
val neighbour_array = arr._2
neighbour_array.foreach(vertex => {
. . .

外部 foreach 可以用 for 循环代替。

关于scala - 在 Spark GraphX 中实现拓扑排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40109279/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com