gpt4 book ai didi

scala - Spark - 如何将 rdd 的前 N ​​个作为新的 rdd(无需在驱动程序处收集)

转载 作者:行者123 更新时间:2023-12-04 14:09:02 28 4
gpt4 key购买 nike

我想知道如何过滤具有前 N 个值之一的 RDD。通常我会对 RDD 进行排序并取 top N 个项目作为驱动程序中的一个数组,以找到可以广播的第 N 个值来过滤 rdd,如下所示:

val topNvalues = sc.broadcast(rdd.map(_.fieldToThreshold).distict.sorted.take(N))
val threshold = topNvalues.last
val rddWithTopNValues = rdd.filter(_.fieldToThreshold >= threshold)

但在这种情况下,我的 N 太大了,所以我怎么能像这样纯粹使用 RDD 来做到这一点?:
def getExpensiveItems(itemPrices: RDD[(Int, Float)], count: Int): RDD[(Int, Float)] = {
val sortedPrices = itemPrices.sortBy(-_._2).map(_._1).distinct

// How to do this without collecting results to driver??
val highPrices = itemPrices.getTopNValuesWithoutCollect(count)

itemPrices.join(highPrices.keyBy(x => x)).map(_._2._1)
}

最佳答案

使用 zipWithIndex在排序的 rdd 上,然后按索引过滤最多 n 个项目。为了说明这种情况,请考虑按降序排序的这个 rrd,

val rdd = sc.parallelize((1 to 10).map( _ => math.random)).sortBy(-_)

然后
rdd.zipWithIndex.filter(_._2 < 4)

将前四项交付给驱动程序,而无需收集 rdd。

关于scala - Spark - 如何将 rdd 的前 N ​​个作为新的 rdd(无需在驱动程序处收集),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47559602/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com