gpt4 book ai didi

scala - Spark 和 Scala : Apply a function to each element of an RDD

转载 作者:行者123 更新时间:2023-12-04 17:46:33 24 4
gpt4 key购买 nike

我有一个 VertexRDD[(VertexId, Long)] 的 RDD,结构如下:

(533, 1)
(571, 2)
(590, 0)
...

其中,每个元素由顶点 id(533、571、590 等)及其出边数(1、2、0 等)组成。

我想对这个 RDD 的每个元素应用一个函数。此函数必须执行传出边数和 4 个阈值之间的比较。

如果出边的数量小于或等于4个阈值之一,则必须将相应的顶点id插入到Array(或其他类似的数据结构)中,以获得最后是4个数据结构,每个数据结构都包含满足与相应阈值比较的顶点的id。

我需要将满足与相同阈值比较的id累积在相同的数据结构中。我如何使用 SparkScala 并行和实现这种方法?

我的代码:

val usersGraphQuery = "MATCH (u1:Utente)-[p:PIU_SA_DI]->(u2:Utente) RETURN id(u1), id(u2), type(p)"
val usersGraph = neo.rels(usersGraphQuery).loadGraph[Any, Any]
val numUserGraphNodes = usersGraph.vertices.count
val numUserGraphEdges = usersGraph.edges.count
val maxNumOutDegreeEdgesPerNode = numUserGraphNodes - 1

// get id and number of outgoing edges of each node from the graph
// except those that have 0 outgoing edges (default behavior of the outDegrees API)
var userNodesOutDegreesRdd: VertexRDD[Int] = usersGraph.outDegrees

/* userNodesOutDegreesRdd.foreach(println)
* Now you can see
* (533, 1)
* (571, 2)
*/

// I also get ids of nodes with zero outgoing edges
var fixedGraph: Graph[Any, Any] = usersGraph.outerJoinVertices(userNodesOutDegreesRdd)( (vid: Any, defaultOutDegrees: Any, outDegOpt: Option[Any]) => outDegOpt.getOrElse(0L) )
var completeUserNodesOutDregreesRdd = fixedGraph.vertices

/* completeUserNodesOutDregreesRdd.foreach(println)
* Now you can see
* (533, 1)
* (571, 2)
* (590, 0) <--
*/

// 4 thresholds that identify the 4 clusters of User nodes based on the number of their outgoing edges
var soglia25: Double = (maxNumOutDegreeEdgesPerNode.toDouble/100)*25
var soglia50: Double = (maxNumOutDegreeEdgesPerNode.toDouble/100)*50
var soglia75: Double = (maxNumOutDegreeEdgesPerNode.toDouble/100)*75
var soglia100: Double = maxNumOutDegreeEdgesPerNode
println("soglie: "+soglia25+", "+soglia50+", "+soglia75+", "+soglia100)

// containers of individual clusters
var lowSAUsers = new ListBuffer[(Long, Any)]()
var mediumLowSAUsers = new ListBuffer[(Long, Any)]()
var mediumHighSAUsers = new ListBuffer[(Long, Any)]()
var highSAUsers = new ListBuffer[(Long, Any)]()
// overall container of the 4 clusters
var clustersContainer = new ListBuffer[ (String, ListBuffer[(Long, Any)]) ]()

// I WANT PARALLEL FROM HERE -----------------------------------------------
// from RDD to Array
var completeUserNodesOutDregreesArray = completeUserNodesOutDregreesRdd.take(numUserGraphNodes.toInt)

// analizzo ogni nodo Utente e lo assegno al cluster di appartenenza
for(i<-0 to numUserGraphNodes.toInt-1) {
// confronto il valore del numero di archi in uscita (convertito in stringa)
// con le varie soglie per determinare in quale classe inserire il relativo nodo Utente
if( (completeUserNodesOutDregreesArray(i)._2).toString().toLong <= soglia25 ) {
println("ok soglia25 ")
lowSAUsers += completeUserNodesOutDregreesArray(i)
}else if( (completeUserNodesOutDregreesArray(i)._2).toString().toLong <= soglia50 ){
println("ok soglia50 ")
mediumLowSAUsers += completeUserNodesOutDregreesArray(i)
}else if( (completeUserNodesOutDregreesArray(i)._2).toString().toLong <= soglia75 ){
println("ok soglia75 ")
mediumHighSAUsers += completeUserNodesOutDregreesArray(i)
}else if( (completeUserNodesOutDregreesArray(i)._2).toString().toLong <= soglia100 ){
println("ok soglia100 ")
highSAUsers += completeUserNodesOutDregreesArray(i)
}

}

// I put each cluster in the final container
clustersContainer += Tuple2("lowSAUsers", lowSAUsers)
clustersContainer += Tuple2("mediumLowSAUsers", mediumLowSAUsers)
clustersContainer += Tuple2("mediumHighSAUsers", mediumHighSAUsers)
clustersContainer += Tuple2("highSAUsers", highSAUsers)

/* clustersContainer.foreach(println)
* Now you can see
* (lowSAUsers,ListBuffer((590,0)))
* (mediumLowSAUsers,ListBuffer((533,1)))
* (mediumHighSAUsers,ListBuffer())
* (highSAUsers,ListBuffer((571,2)))
*/

// ---------------------------------------------------------------------

最佳答案

如何创建一个表示不同 bin 的元组数组:

val bins = Seq(0, soglia25, soglia50, soglia75, soglia100).sliding(2)
.map(seq => (seq(0), seq(1))).toArray

然后为你的 RDD 的每个元素找到一个对应的 bin,将其作为键,将 id 转换为 Seq 并按键归约:

def getBin(bins: Array[(Double, Double)], value: Int): Int = { 
bins.indexWhere {case (a: Double, b: Double) => a < value && b >= value}
}
userNodesOutDegreesRdd.map {
case (id, value) => (getBin(bins, value), Seq(id))
}.reduceByKey(_ ++ _)

关于scala - Spark 和 Scala : Apply a function to each element of an RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48261414/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com