gpt4 book ai didi

scala - 将 RDD 拆分为没有重复值的 RDD

转载 作者:行者123 更新时间:2023-12-02 01:39:00 27 4
gpt4 key购买 nike

我有一个 RDD 对:

(105,918)
(105,757)
(502,516)
(105,137)
(516,816)
(350,502)

我想将它分成两个 RDD,这样第一个只有具有非重复值的对(对于键和值),第二个将具有其余省略的对。

所以从上面我们可以得到两个RDD

 1) (105,918)
(502,516)

2) (105,757) - Omitted as 105 is already included in 1st RDD
(105,137) - Omitted as 105 is already included in 1st RDD
(516,816) - Omitted as 516 is already included in 1st RDD
(350,502) - Omitted as 502 is already included in 1st RDD

目前我正在使用一个可变的 Set 变量来跟踪在将原始 RDD 合并到单个分区后已经选择的元素:

val evalCombinations = collection.mutable.Set.empty[String]
val currentValidCombinations = allCombinations
.filter(p => {
if(!evalCombinations.contains(p._1) && !evalCombinations.contains(p._2)) {
evalCombinations += p._1;evalCombinations += p._2; true
} else
false
})

此方法受运行操作的执行程序的内存限制。是否有更好的可扩展解决方案?

最佳答案

这是一个版本,它需要更多的驱动程序内存。

import org.apache.spark.rdd._
import org.apache.spark._

def getUniq(rdd: RDD[(Int, Int)], sc: SparkContext): RDD[(Int, Int)] = {

val keys = rdd.keys.distinct
val values = rdd.values.distinct

// these are the keys which appear in value part also.
val both = keys.intersection(values)

val bBoth = sc.broadcast(both.collect.toSet)

// remove those key-value pairs which have value which is also a key.
val uKeys = rdd.filter(x => !bBoth.value.contains(x._2))
.reduceByKey{ case (v1, v2) => v1 } // keep uniq keys

uKeys.map{ case (k, v) => (v, k) } // swap key, value
.reduceByKey{ case (v1, v2) => v1 } // keep uniq value
.map{ case (k, v) => (v, k) } // correct placement

}

def getPartitionedRDDs(rdd: RDD[(Int, Int)], sc: SparkContext) = {

val r = getUniq(rdd, sc)
val remaining = rdd subtract r
val set = r.flatMap { case (k, v) => Array(k, v) }.collect.toSet
val a = remaining.filter{ case (x, y) => !set.contains(x) &&
!set.contains(y) }
val b = getUniq(a, sc)
val part1 = r union b
val part2 = rdd subtract part1
(part1, part2)
}

val rdd = sc.parallelize(Array((105,918),(105,757),(502,516),
(105,137),(516,816),(350,502)))

val (first, second) = getPartitionedRDDs(rdd, sc)
// first.collect: ((516,816), (105,918), (350,502))
// second.collect: ((105,137), (502,516), (105,757))

val rdd1 = sc.parallelize(Array((839,841),(842,843),(840,843),
(839,840),(1,2),(1,3),(4,3)))

val (f, s) = getPartitionedRDDs(rdd1, sc)
//f.collect: ((839,841), (1,2), (840,843), (4,3))

关于scala - 将 RDD 拆分为没有重复值的 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29482121/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com