gpt4 book ai didi

apache-spark - 使用数据集在 Apache Spark 中交叉加入非常慢

转载 作者:行者123 更新时间:2023-12-04 01:47:26 25 4
gpt4 key购买 nike

我已经在 spark 用户论坛上发布了这个问题,但没有收到任何回复,所以再次在这里提问。

我们有一个用例,我们需要进行笛卡尔连接,但出于某种原因我们无法让它与数据集 API 一起工作。

我们有两个数据集:

  • one data set with 2 string columns say c1, c2. It is a small data set with ~1 million records. The two columns are both strings of 32 characters so should be less than 500 mb.

    We broadcast this dataset

  • 另一个数据集稍大一些,大约有 1000 万条记录
val ds1 = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2")
ds1.count
val ds2 = spark.read.format("csv").load(<s3-location>).toDF("c11", "c12", "c13", "c14", "c15", "ts")
ds2.count
ds2.crossJoin(broadcast(ds1)).filter($"c1" <= $"c11" && $"c11" <= $"c2").count

如果我使用 RDD api 实现它,我在 ds1 中广播数据,然后在 ds2 中过滤数据,它工作正常。

确认广播成功

2019-02-14 23:11:55 INFO CodeGenerator:54 - Code generated in 10.469136 ms 2019-02-14 23:11:55 INFO TorrentBroadcast:54 - Started reading broadcast variable 29 2019-02-14 23:11:55 INFO TorrentBroadcast:54 - Reading broadcast variable 29 took 6 ms 2019-02-14 23:11:56 INFO CodeGenerator:54 - Code generated in 11.280087 ms

查询计划:

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, Cross, ((c1#68 <= c11#13) && (c11#13 <= c2#69))
:- *Project []
: +- *Filter isnotnull(_c0#0)
: +- *FileScan csv [_c0#0,_c1#1,_c2#2,_c3#3,_c4#4,_c5#5] Batched: false, Format: CSV, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(_c0)], ReadSchema: struct<_c0:string,_c1:string,_c2:string,_c3:string,_c4:string,_c5:string>
+- BroadcastExchange IdentityBroadcastMode
+- *Project [c1#68, c2#69]
+- *Filter (isnotnull(c1#68) && isnotnull(c2#69))
+- *FileScan csv [c1#68,c2#69] Batched: false, Format: CSV, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(c1), IsNotNull(c2)], ReadSchema: struct

那么这个阶段就没有进展了。

我更新了代码以使用广播 ds1,然后在 ds2 的 mapPartitions 中加入。

val ranges = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2").collect
val rangesBC = sc.broadcast(ranges)

然后在 mapPartitions 方法中使用这个 rangesBC 来识别 ds2 中每一行所属的范围,这个作业在 3 小时内完成,而另一个作业甚至在 24 小时后也没有完成。这种暗示查询优化器没有做我想要它做的事情。

我做错了什么?任何指针都会有所帮助。谢谢!

最佳答案

我最近遇到了这个问题,发现 Spark 在交叉连接大型数据帧时有一种奇怪的分区行为。如果您的输入数据框包含几百万条记录,那么交叉连接数据框的分区等于输入数据框分区的乘积,即

crossJoinDF 的分区 =(ds1 的分区)*(ds2 的分区)。

如果 ds1 或 ds2 包含大约几百个分区,则交叉连接数据帧的分区范围约为 10,000。这些分区太多了,这会导致管理许多小任务的开销过大,从而使交叉连接数据帧上的任何计算(在您的情况下为过滤器)运行速度非常慢。

那么如何使计算速度更快呢?首先检查这是否确实是您的问题所在:

scala> val crossJoinDF = ds2.crossJoin(ds1)
# This should return immediately because of spark lazy evaluation

scala> val crossJoinDFPartitions = crossJoinDF.rdd.partitions.size

检查交叉连接数据框上的分区数。如果 crossJoinDFPartitions > 10,000,那么您确实遇到了同样的问题,即交叉连接数据帧的分区太多了。

为了加快交叉连接数据帧的操作速度,请减少输入数据帧的分区数量。例如:

scala> val ds1 = ds1.repartition(40)
scala> ds1.rdd.partitions.size
res80: Int = 40

scala> val ds2 = ds2.repartition(40)
scala> ds2.rdd.partitions.size
res81: Int = 40

scala> val crossJoinDF = ds1.crossJoin(ds2)
scala> crossJoinDF.rdd.partitions.size
res82: Int = 1600

scala> crossJoinDF.count()

count() 操作应该导致交叉连接的执行。计数现在应该会在合理的时间内返回。您选择的确切分区数将取决于集群中可用的核心数。

此处的关键要点是确保您的交叉连接数据框具有合理数量的分区 (<< 10,000)。您可能还会发现 this post有用,它更详细地解释了这个问题。

关于apache-spark - 使用数据集在 Apache Spark 中交叉加入非常慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54713765/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com