gpt4 book ai didi

join - 如何有效地将大 rdd 加入 spark 中非常大的 rdd?

转载 作者:行者123 更新时间:2023-12-04 02:58:34 24 4
gpt4 key购买 nike

我有两个 RDD。一个 RDD 在 5-1000 万个条目之间,而另一个 RDD 在 50000 万-75000 万个条目之间。在某些时候,我必须使用一个公共(public) key 加入这两个 rdds。

val rddA = someData.rdd.map { x => (x.key, x); } // 10-million
val rddB = someData.rdd.map { y => (y.key, y); } // 600-million
var joinRDD = rddA.join(rddB);

当 spark 决定做这个 join 时,它决定做一个 ShuffledHashJoin。这会导致 rddB 中的许多项目在网络上被洗牌。同样,一些 rddA 也在网络上被洗牌。在这种情况下,rddA 太大而不能用作广播变量,但似乎 BroadcastHashJoin 会更有效。是否有提示使用 BroadcastHashJoin? (Apache Flink 通过连接提示支持这一点)。

如果没有,是增加 autoBroadcastJoinThreshold 的唯一选择吗?

更新 7/14

我的性能问题似乎完全 Root 于重新分区。通常,从 HDFS 读取的 RDD 会按 block 分区,但在这种情况下,源是 [我制作的] parquet 数据源。当 spark (databricks) 写入 parquet 文件时,每个分区写入一个文件,同样,每个文件读取一个分区。因此,我发现的最佳答案是,在数据源的生产过程中,通过键对其进行分区,然后写出 Parquet 接收器(然后自然地共同分区)并将其用作 rddB。

给出的答案是正确的,但我认为有关 parquet 数据源的详细信息可能对其他人有用。

最佳答案

您可以使用相同的分区器对 RDD 进行分区,在这种情况下,具有相同键的分区将被配置在同一个执行器上。

在这种情况下,您将避免连接操作的随机播放。

Shuffle 只会发生一次,当你更新 parititoner 时,如果你要缓存 RDD 的所有连接,那之后应该是本地的 executors

import org.apache.spark.SparkContext._

class A
class B

val rddA: RDD[(String, A)] = ???
val rddB: RDD[(String, B)] = ???

val partitioner = new HashPartitioner(1000)

rddA.partitionBy(partitioner).cache()
rddB.partitionBy(partitioner).cache()

您也可以尝试更新广播阈值大小,也许 rddA 可以广播:
--conf spark.sql.autoBroadcastJoinThreshold=300000000 # ~300 mb

我们使用 400mb 进行广播连接,效果很好。

关于join - 如何有效地将大 rdd 加入 spark 中非常大的 rdd?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31392261/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com