gpt4 book ai didi

scala - 这是在 RDD 上实现惰性 `take` 的合适方法吗?

转载 作者:行者123 更新时间:2023-12-01 13:40:16 24 4
gpt4 key购买 nike

非常不幸的是,RDD 上的 take 是一个严格的操作而不是惰性操作,但我不会解释为什么我现在认为这是一个令人遗憾的设计。

我的问题是这是否是 RDD 的惰性 take 的合适实现。它似乎有效,但我可能遗漏了一些不明显的问题。

def takeRDD[T: scala.reflect.ClassTag](rdd: RDD[T], num: Long): RDD[T] =
new RDD[T](rdd.context, List(new OneToOneDependency(rdd))) {
// An unfortunate consequence of the way the RDD AST is designed
var doneSoFar = 0L

def isDone = doneSoFar >= num

override def getPartitions: Array[Partition] = rdd.partitions

// Should I do this? Doesn't look like I need to
// override val partitioner = self.partitioner

override def compute(split: Partition, ctx: TaskContext): Iterator[T] = new Iterator[T] {
val inner = rdd.compute(split, ctx)

override def hasNext: Boolean = !isDone && inner.hasNext

override def next: T = {
doneSoFar += 1
inner.next
}
}
}

最佳答案

回答你的问题

不,这行不通。没有办法让一个变量可以在 Spark 集群中同时看到和更新,而这正是您要使用 doneSoFar 作为。如果您尝试这样做,那么当您运行 compute(跨多个节点并行)时,您将

a) 序列化任务中的 takeRDD,因为您引用了类变量 doneSoFar。这意味着您将类写入字节并在每个 JVM(执行程序)中创建一个新实例

b) 在计算中更新doneSoFar,这会更新每个执行程序 JVM 上的本地实例。您将从每个分区中获取数量等于 num 的元素。

由于那里的某些 JVM 属性,这可能会在 Spark 本地模式下工作,但在集群模式下运行 Spark 时肯定不会工作。

为什么take是一个 Action ,而不是转换

RDD 是分布式的,因此子集化为精确数量的元素是一个低效的操作——它不能完全并行完成,因为每个分片都需要关于其他分片的信息(比如是否应该计算它). Take 非常适合将分布式数据带回本地内存。

rdd.sample 是分布式世界中的类似操作,可以轻松并行运行。

关于scala - 这是在 RDD 上实现惰性 `take` 的合适方法吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40918222/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com