gpt4 book ai didi

apache-spark - 缓慢加入pyspark,尝试重新分区

转载 作者:行者123 更新时间:2023-12-04 07:13:44 25 4
gpt4 key购买 nike

我正在尝试在 Spark 3 上左连接 2 个表,其中包含 17M 行(事件)和 400M 行(详细信息)。有一个 1 + 15 x 64core 实例的 EMR 集群。 (r6g.16xlarge 尝试过类似的 r5a)源文件是从 S3 加载的未分区 Parquet 。

这是我用来加入的代码:

join = (
broadcast(events).join(
details,
[
details["a"] == events["a2"],
(unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
> 5,
],
"left",
)
).drop("a")

join.checkpoint()

要分区,我正在使用这个:

executors = 15 * 64 * 3  # 15 instances, 64 cores, 3 workers per core

所以我尝试了:

details = details.repartition(executors, "a")

details = details.withColumn("salt", (rand(seed=42) * nSaltBins).cast("int"))
details = details.repartition(executors, "salt")

在这两种情况下,90% 的工作人员在 5-10 分钟左右结束,其余工作人员继续很长时间(50 分钟以上),长绿线,日志上没有内存或磁盘错误。

分区后有一点偏斜(所有分区在 180k 到 160k 行之间),处理器时间超过 50 分钟没有任何问题。

知道我可以监督什么吗?看了一大堆帖子,还是觉得绿线(worker时间)之间应该更近一些,都是同时开始的,不是在等一个worker结束。

谢谢!

--编辑---删除广播

在作业 11 的第 17 阶段,它在 2 分钟内完成 974/1000,30 分钟后仍然在 993/1000,上一步使用加盐分区(由 executors 变量给出)并且速度非常快。

执行计划:

Using 17906254 events
== Physical Plan ==
AdaptiveSparkPlan (13)
+- Project (12)
+- SortMergeJoin LeftOuter (11)
:- Sort (4)
: +- Exchange (3)
: +- Project (2)
: +- Scan parquet (1)
+- Sort (10)
+- Exchange (9)
+- Exchange (8)
+- Project (7)
+- Filter (6)
+- Scan parquet (5)

as image

2 小时及以上时间的 25% 的示例是剩余 1 个执行者 enter image description here

当前 Spark 配置:

spark = SparkSession.builder.appName('Test').config("spark.driver.memory", "108g").config(
"spark.executor.instances", "59").config("spark.executor.memoryOverhead", "13312").config(
"spark.executor.memory", "108g").config("spark.executor.cores", "15").config("spark.driver.cores", "15").config(
"spark.default.parallelism", "1770").config("spark.sql.adaptive.enabled", "true").config(
"spark.sql.adaptive.skewJoin.enabled", "true").config("spark.sql.shuffle.partitions", "885").getOrCreate()

最佳答案

你的问题看起来像是倾斜连接的一个很好的例子,其中一些分区会比其他分区获得更多的数据,从而减慢整个工作。

在加入之前重新分区您的数据框不会有帮助,因为 SortMergeJoin 操作将在您的连接键上再次重新分区以处理连接

由于您使用的是 Spark 3,因此您应该支持 automatic skewJoin management .

要使用它,请确保您同时拥有 spark.sql.adaptive.enabled=true(在标准 Spark 发行版中默认为 false)和 spark.sql.adaptive.skewJoin。启用=真

如果你不能使用自动 skewJoin 优化,你可以用这样的东西手动修复它:

  • 将您的小数据集复制 N 倍
n = 10   # Chose an appropriate amount based on skewness
skewedEvents = events.crossJoin(spark.range(0,n).withColumnRenamed("id","eventSalt"))
  • 使用介于 0 和 N 之间的随机列值为您的大型数据集播种
import pyspark.sql.functions as f

skewedDetails = details.withColumn("detailSalt", (f.rand() * n).cast("int"))
  • 在连接键中使用您的盐加入,然后放下盐
joined = skewedEvents.join(skewedDetails,[         [
skewedDetails["a"] == skewedEvents["a2"],
skewedDetails["detailSalt"] == skewedEvents["eventSalt"],
(unix_timestamp(skewedEvents["date"]) - unix_timestamp(skewedDetails["date"])) / 3600
> 5,
],
"left")\
.filter("a is not null or (a is null and eventSalt = 0)")\
.drop("a").drop("eventSalt").drop("detailSalt")

请注意,还可能需要验证您的查询连接条件,因为 UI 显示在详细信息上处理了 3.33 亿行,在事件上处理了 1700 万行,您生成了超过 50 亿的输出行,因此您可以匹配更多您认为的行您的加入条件。

关于apache-spark - 缓慢加入pyspark,尝试重新分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68899346/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com