- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试在 Spark 3 上左连接 2 个表,其中包含 17M 行(事件)和 400M 行(详细信息)。有一个 1 + 15 x 64core 实例的 EMR 集群。 (r6g.16xlarge 尝试过类似的 r5a)源文件是从 S3 加载的未分区 Parquet 。
这是我用来加入的代码:
join = (
broadcast(events).join(
details,
[
details["a"] == events["a2"],
(unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
> 5,
],
"left",
)
).drop("a")
join.checkpoint()
要分区,我正在使用这个:
executors = 15 * 64 * 3 # 15 instances, 64 cores, 3 workers per core
所以我尝试了:
details = details.repartition(executors, "a")
和
details = details.withColumn("salt", (rand(seed=42) * nSaltBins).cast("int"))
details = details.repartition(executors, "salt")
在这两种情况下,90% 的工作人员在 5-10 分钟左右结束,其余工作人员继续很长时间(50 分钟以上),长绿线,日志上没有内存或磁盘错误。
分区后有一点偏斜(所有分区在 180k 到 160k 行之间),处理器时间超过 50 分钟没有任何问题。
知道我可以监督什么吗?看了一大堆帖子,还是觉得绿线(worker时间)之间应该更近一些,都是同时开始的,不是在等一个worker结束。
谢谢!
--编辑---删除广播
在作业 11 的第 17 阶段,它在 2 分钟内完成 974/1000,30 分钟后仍然在 993/1000,上一步使用加盐分区(由 executors 变量给出)并且速度非常快。
执行计划:
Using 17906254 events
== Physical Plan ==
AdaptiveSparkPlan (13)
+- Project (12)
+- SortMergeJoin LeftOuter (11)
:- Sort (4)
: +- Exchange (3)
: +- Project (2)
: +- Scan parquet (1)
+- Sort (10)
+- Exchange (9)
+- Exchange (8)
+- Project (7)
+- Filter (6)
+- Scan parquet (5)
当前 Spark 配置:
spark = SparkSession.builder.appName('Test').config("spark.driver.memory", "108g").config(
"spark.executor.instances", "59").config("spark.executor.memoryOverhead", "13312").config(
"spark.executor.memory", "108g").config("spark.executor.cores", "15").config("spark.driver.cores", "15").config(
"spark.default.parallelism", "1770").config("spark.sql.adaptive.enabled", "true").config(
"spark.sql.adaptive.skewJoin.enabled", "true").config("spark.sql.shuffle.partitions", "885").getOrCreate()
最佳答案
你的问题看起来像是倾斜连接的一个很好的例子,其中一些分区会比其他分区获得更多的数据,从而减慢整个工作。
在加入之前重新分区您的数据框不会有帮助,因为 SortMergeJoin 操作将在您的连接键上再次重新分区以处理连接
由于您使用的是 Spark 3,因此您应该支持 automatic skewJoin management .
要使用它,请确保您同时拥有 spark.sql.adaptive.enabled=true
(在标准 Spark 发行版中默认为 false)和 spark.sql.adaptive.skewJoin。启用=真
如果你不能使用自动 skewJoin 优化,你可以用这样的东西手动修复它:
n = 10 # Chose an appropriate amount based on skewness
skewedEvents = events.crossJoin(spark.range(0,n).withColumnRenamed("id","eventSalt"))
import pyspark.sql.functions as f
skewedDetails = details.withColumn("detailSalt", (f.rand() * n).cast("int"))
joined = skewedEvents.join(skewedDetails,[ [
skewedDetails["a"] == skewedEvents["a2"],
skewedDetails["detailSalt"] == skewedEvents["eventSalt"],
(unix_timestamp(skewedEvents["date"]) - unix_timestamp(skewedDetails["date"])) / 3600
> 5,
],
"left")\
.filter("a is not null or (a is null and eventSalt = 0)")\
.drop("a").drop("eventSalt").drop("detailSalt")
请注意,还可能需要验证您的查询连接条件,因为 UI 显示在详细信息上处理了 3.33 亿行,在事件上处理了 1700 万行,您生成了超过 50 亿的输出行,因此您可以匹配更多您认为的行您的加入条件。
关于apache-spark - 缓慢加入pyspark,尝试重新分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68899346/
我想对 JOIN 进行特定的排序 SELECT * FROM (lives_in as t1 NATURAL JOIN preferences p1) l1 JOIN (lives_in t2 NAT
我正在努力解决一个查询。并想知道是否有人可以提供帮助。 我有一个标签表(服务请求票)和序列号表 从我的标签中我正在这样做 Select * from tag where tag.created BET
关闭。这个问题需要多问focused 。目前不接受答案。 想要改进此问题吗?更新问题,使其仅关注一个问题 editing this post . 已关闭 7 年前。 Improve this ques
我有两个表 tbl_user 和 tbl_lastchangepassword,如下所示 表 tbl_user id| name --------- 1 | user1 2 | user2 3 |
我有下一个问题 SELECT i.*, gu.* vs.* FROM common.global_users gu LEFT JOIN common.global_users_perms gup ON
我有一个电影表和一个投票表。用户为他们喜欢的电影投票。我需要显示按电影总票数降序排列的电影列表。我现在所拥有的有点作品。唯一的问题是它不显示 0 票的电影。 SELECT m.name, m.imdb
我有一个由这样的表组成的 mySql 数据库: 我如何(如果可能的话)使用 JOINS 从名称/周期表中获取结果?简单来说,它是如何工作的?我向菜鸟问题道歉。我对此很陌生。任何帮助将不胜感激。 最佳答
我需要查询单元先决条件的自引用关系。 我知道您需要使用两个联接,我是否选择我的列然后将其联接到自身? SELECT u.unit_code, u.name + ' is a prerequisi
我有两个实体,用户和友谊,它们看起来像: public class User { public int UserId { get; set; } (..
假设我有两个表: Table A ProdID | PartNumber | Data... 1 | ABC-a | "Data A" 2 | (null) |
说我有这个数据, (df <- data.frame( col1 = c('My','Your','His','Thir'), col2 = c('Cat','Dog','Fish','Dog')))
我有两个这样的数组,实际上这是从两个不同的服务器检索的 mysql 数据: $array1 = array ( 0 => array ( 'id' => 1, 'n
我的数据库中有以下表格 CREATE TABLE [author_details] ( [_id] INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, [name
我正在努力使用一个相当简单的 sql select 语句的 join/where 子句。 我正在尝试从 tb1 中检索产品信息列表,其中 where 条件位于 tbl2 中,但这必须由三个不同的列连接
我正在寻找以下功能: Applicative f => f (f a) -> f a Hoogle给我看join : >:t join join :: Monad m => m (m a) -> m
我有两个“表”,分别是 USER 和 CONGE。在表“CONGE”中,我插入了用户的 ID。但是我不知道如何根据用户的id显示用户的休假。 我想根据id发布“Congé”。 { "conge"
我们有一个具有(简化)结构的文档,如Elasticsearch所示: { _id: ..., patientId: 4711, text: "blue" } { _id: ..., patientId
这两个sql语句有什么区别 a) 从 T1,T2 中选择 *,其中 T1.A=T2.A ; b) 从 T1,T2 中选择 *,其中 T2.A=T1.A ; 在这两种情况下我得到相同的输出,这两种语句之
我想做一个简单的连接,只是比较两个表中的 ID.. 我有我的组表,包含; 身份证 姓名 等.. 我的 GroupMap 表包含; 身份证 组号 元素编号 我的查询采用 GroupMap.ItemID
所以我有一组主要数据,如下所示: value_num code value_letter 1 CDX A 2 DEF B
我是一名优秀的程序员,十分优秀!