- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个 VertexRDD[(VertexId, Long)] 的 RDD,结构如下:
(533, 1)
(571, 2)
(590, 0)
...
其中,每个元素由顶点 id(533、571、590 等)及其出边数(1、2、0 等)组成。
我想对这个 RDD
的每个元素应用一个函数。此函数必须执行传出边数和 4 个阈值之间的比较。
如果出边的数量小于或等于4个阈值之一,则必须将相应的顶点id插入到Array
(或其他类似的数据结构)中,以获得最后是4个数据结构,每个数据结构都包含满足与相应阈值比较的顶点的id。
我需要将满足与相同阈值比较的id累积在相同的数据结构中。我如何使用 Spark
和 Scala
并行和实现这种方法?
我的代码:
val usersGraphQuery = "MATCH (u1:Utente)-[p:PIU_SA_DI]->(u2:Utente) RETURN id(u1), id(u2), type(p)"
val usersGraph = neo.rels(usersGraphQuery).loadGraph[Any, Any]
val numUserGraphNodes = usersGraph.vertices.count
val numUserGraphEdges = usersGraph.edges.count
val maxNumOutDegreeEdgesPerNode = numUserGraphNodes - 1
// get id and number of outgoing edges of each node from the graph
// except those that have 0 outgoing edges (default behavior of the outDegrees API)
var userNodesOutDegreesRdd: VertexRDD[Int] = usersGraph.outDegrees
/* userNodesOutDegreesRdd.foreach(println)
* Now you can see
* (533, 1)
* (571, 2)
*/
// I also get ids of nodes with zero outgoing edges
var fixedGraph: Graph[Any, Any] = usersGraph.outerJoinVertices(userNodesOutDegreesRdd)( (vid: Any, defaultOutDegrees: Any, outDegOpt: Option[Any]) => outDegOpt.getOrElse(0L) )
var completeUserNodesOutDregreesRdd = fixedGraph.vertices
/* completeUserNodesOutDregreesRdd.foreach(println)
* Now you can see
* (533, 1)
* (571, 2)
* (590, 0) <--
*/
// 4 thresholds that identify the 4 clusters of User nodes based on the number of their outgoing edges
var soglia25: Double = (maxNumOutDegreeEdgesPerNode.toDouble/100)*25
var soglia50: Double = (maxNumOutDegreeEdgesPerNode.toDouble/100)*50
var soglia75: Double = (maxNumOutDegreeEdgesPerNode.toDouble/100)*75
var soglia100: Double = maxNumOutDegreeEdgesPerNode
println("soglie: "+soglia25+", "+soglia50+", "+soglia75+", "+soglia100)
// containers of individual clusters
var lowSAUsers = new ListBuffer[(Long, Any)]()
var mediumLowSAUsers = new ListBuffer[(Long, Any)]()
var mediumHighSAUsers = new ListBuffer[(Long, Any)]()
var highSAUsers = new ListBuffer[(Long, Any)]()
// overall container of the 4 clusters
var clustersContainer = new ListBuffer[ (String, ListBuffer[(Long, Any)]) ]()
// I WANT PARALLEL FROM HERE -----------------------------------------------
// from RDD to Array
var completeUserNodesOutDregreesArray = completeUserNodesOutDregreesRdd.take(numUserGraphNodes.toInt)
// analizzo ogni nodo Utente e lo assegno al cluster di appartenenza
for(i<-0 to numUserGraphNodes.toInt-1) {
// confronto il valore del numero di archi in uscita (convertito in stringa)
// con le varie soglie per determinare in quale classe inserire il relativo nodo Utente
if( (completeUserNodesOutDregreesArray(i)._2).toString().toLong <= soglia25 ) {
println("ok soglia25 ")
lowSAUsers += completeUserNodesOutDregreesArray(i)
}else if( (completeUserNodesOutDregreesArray(i)._2).toString().toLong <= soglia50 ){
println("ok soglia50 ")
mediumLowSAUsers += completeUserNodesOutDregreesArray(i)
}else if( (completeUserNodesOutDregreesArray(i)._2).toString().toLong <= soglia75 ){
println("ok soglia75 ")
mediumHighSAUsers += completeUserNodesOutDregreesArray(i)
}else if( (completeUserNodesOutDregreesArray(i)._2).toString().toLong <= soglia100 ){
println("ok soglia100 ")
highSAUsers += completeUserNodesOutDregreesArray(i)
}
}
// I put each cluster in the final container
clustersContainer += Tuple2("lowSAUsers", lowSAUsers)
clustersContainer += Tuple2("mediumLowSAUsers", mediumLowSAUsers)
clustersContainer += Tuple2("mediumHighSAUsers", mediumHighSAUsers)
clustersContainer += Tuple2("highSAUsers", highSAUsers)
/* clustersContainer.foreach(println)
* Now you can see
* (lowSAUsers,ListBuffer((590,0)))
* (mediumLowSAUsers,ListBuffer((533,1)))
* (mediumHighSAUsers,ListBuffer())
* (highSAUsers,ListBuffer((571,2)))
*/
// ---------------------------------------------------------------------
最佳答案
如何创建一个表示不同 bin 的元组数组:
val bins = Seq(0, soglia25, soglia50, soglia75, soglia100).sliding(2)
.map(seq => (seq(0), seq(1))).toArray
然后为你的 RDD 的每个元素找到一个对应的 bin,将其作为键,将 id 转换为 Seq 并按键归约:
def getBin(bins: Array[(Double, Double)], value: Int): Int = {
bins.indexWhere {case (a: Double, b: Double) => a < value && b >= value}
}
userNodesOutDegreesRdd.map {
case (id, value) => (getBin(bins, value), Seq(id))
}.reduceByKey(_ ++ _)
关于scala - Spark 和 Scala : Apply a function to each element of an RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48261414/
我被这种奇怪的事情难住了。 假设我有这个数组: var array = [{ something: 'special' }, 'and', 'a', 'bunch', 'of', 'paramet
假设我们有这样的代码: let fn1 = Function.apply.bind(Math.max, null); fn1([1, 10, 5]); // returns 10 我知道它是 ES6
所以我尝试通过数据绑定(bind)调用我的 viewModel 原型(prototype)上的方法。我通过“单击”将两个不同的元素数据绑定(bind)到同一方法。当我单击第一个按钮(“新游戏”按钮)时
观察以下代码 trait Example { type O def apply(o: O) def f(o: O) = this.apply(o) } 在Scala中编译良好。我希望我可以
我知道 apply f in H 可用于将假设应用于函数,并且我知道 apply f with a b c 可用于提供参数直接应用 f 时,它无法自行推断。 是否可以以某种方式将两者结合使用? 最佳答
这个问题已经有答案了: How to override apply in a case class companion (10 个回答) 已关闭 6 年前。 我正在尝试重载案例类的 apply 方法:
我有一个自定义的Grails 4.x配置文件。我想为我的应用程序生成一个“apply from”条目。 apply from:"${rootProject.projectDir}/gradle/clo
传统上对象继承如下所示: function Parent() { console.log('parent constructor'); } Parent.prototype.method = f
今天在检查Jasmine 的源代码时here我偶然发现了以下内容: if (queueableFn.timeout) { timeoutId = Function.prototype.appl
据我所知,关键字new会使用this创建一个包含函数中定义的属性的对象。但我不知道如何应用 使用 apply 将其他函数链接到该函数。并且创建的对象在这些函数中具有属性。有人能弄清楚代码中发生了什么吗
我一直在我的 InitComponent 中使用 Ext.Apply,就像这样 Ext.apply(that, { xtype: 'form', items: [.
我们有数百个存储库,并定期从上游接收补丁。作业应用这些补丁 git apply --check .如果没有错误,则应用补丁 git apply 并且更改已提交。如果有任何错误,补丁将标记为 conf
我最近通过调用 console.log.toString() 查看了 firebugs console.log 的代码并得到了这个: function () { return Function.app
拿这个代码: $scope.$apply(function(){ $scope.foo = 'test'; }); 对比这个: $scope.foo = 'test'; $scope.$app
我在 Oracle-12c 中有一个类似于典型论坛的架构 accounts , posts , comments .我正在编写一个查询来获取... 一位用户 该用户的所有帖子 对每个帖子的评论 以及每
我试图更好地理解在 Angular 中使用 $timeout 服务作为一种“安全 $apply”方法的细微差别。基本上在一段代码可以运行以响应 Angular 事件或非 Angular 事件(例如 j
到目前为止,我使用的是 this当我有多个时间序列要预测时,我使用了 Hyndman 教授的方法。但是当我有大量的 ts 时它相当慢。 现在我正在尝试使用 apply() 函数,如下所示 librar
我听说过很多关于 pandas apply 很慢的说法,应该尽可能少用。 我这里有个情况: df = pd.DataFrame({'Date': ['2019-01-02', '2019-01-03'
在学习Javascript时,我尝试重新声明函数的apply属性。到目前为止没有问题。 function foo() { return 1; } alert(foo()); // 1 alert(fo
所以我正在做 learnRx http://reactive-extensions.github.io/learnrx/我有一个关于制作 mergeAll() 函数的问题(问题 10)。 这是我的答案
我是一名优秀的程序员,十分优秀!