gpt4 book ai didi

apache-spark - 在生成的图上运行 Spark GraphX 算法的问题

转载 作者:行者123 更新时间:2023-12-04 04:17:00 27 4
gpt4 key购买 nike

我使用以下代码在 Spark GraphX 中创建了一个图形。 (见 my question and solution)

import scala.math.random
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.util.Random
import org.apache.spark.HashPartitioner

object SparkER {

val nPartitions: Integer = 4
val n: Long = 100
val p: Double = 0.1

def genNodeIds(nPartitions: Int, n: Long)(i: Int) = {
(0L until n).filter(_ % nPartitions == i).toIterator
}

def genEdgesForId(p: Double, n: Long, random: Random)(i: Long) = {
(i + 1 until n).filter(_ => random.nextDouble < p).map(j => Edge(i, j, ()))
}

def genEdgesForPartition(iter: Iterator[Long]) = {
val random = new Random(new java.security.SecureRandom())
iter.flatMap(genEdgesForId(p, n, random))
}

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark ER").setMaster("local[4]")
val sc = new SparkContext(conf)

val empty = sc.parallelize(Seq.empty[Int], nPartitions)
val ids = empty.mapPartitionsWithIndex((i, _) => genNodeIds(nPartitions, n)(i))

val edges = ids.mapPartitions(genEdgesForPartition)
val vertices: VertexRDD[Unit] = VertexRDD(ids.map((_, ())))

val graph = Graph(vertices, edges)

val cc = graph.connectedComponents().vertices //Throwing Exceptions

println("Stopping Spark Context")
sc.stop()
}
}

现在,我可以访问图表并查看节点的度数。但是,当我尝试获取某些度量值(例如 Connected components)时,会遇到以下异常。
15/12/22 12:12:57 ERROR Executor: Exception in task 3.0 in stage 6.0 (TID 19)
java.lang.ArrayIndexOutOfBoundsException: -1
at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$mcJI$sp.apply$mcJI$sp(GraphXPrimitiveKeyOpenHashMap.scala:64)
at org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:91)
at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:99)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
15/12/22 12:12:57 ERROR Executor: Exception in task 1.0 in stage 6.0 (TID 17)
java.lang.ArrayIndexOutOfBoundsException: -1
at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$mcJI$sp.apply$mcJI$sp(GraphXPrimitiveKeyOpenHashMap.scala:64)
at org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:91)
at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:99)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

为什么我无法使用 GraphX 在生成的图形上执行这些操作?

最佳答案

我发现,如果我执行以下操作,则不会发生异常。

val graph = Graph(vertices, edges).partitionBy(PartitionStrategy.RandomVertexCut)

显然,一些 GraphX 算法需要重新分区。但目的对我来说并不完全清楚。

关于apache-spark - 在生成的图上运行 Spark GraphX 算法的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34421308/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com