- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个 DataFrame 生成如下:
df.groupBy($"Hour", $"Category")
.agg(sum($"value") as "TotalValue")
.sort($"Hour".asc, $"TotalValue".desc))
结果如下:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
| 3| cat8| 35.6|
| ...| ....| ....|
+----+--------+----------+
如您所见,DataFrame 按Hour
升序排列,然后按TotalValue
降序排列。
我想选择每个组的第一行,即
所以期望的输出将是:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 1| cat67| 28.5|
| 2| cat56| 39.6|
| 3| cat8| 35.6|
| ...| ...| ...|
+----+--------+----------+
能够选择每个组的前 N 行可能会很方便。
非常感谢任何帮助。
最佳答案
窗口函数:
像这样的东西应该可以解决问题:
import org.apache.spark.sql.functions.{row_number, max, broadcast}
import org.apache.spark.sql.expressions.Window
val df = sc.parallelize(Seq(
(0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
(1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
(2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
(3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue")
val w = Window.partitionBy($"hour").orderBy($"TotalValue".desc)
val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")
dfTop.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// | 0| cat26| 30.9|
// | 1| cat67| 28.5|
// | 2| cat56| 39.6|
// | 3| cat8| 35.6|
// +----+--------+----------+
这种方法在数据严重倾斜的情况下效率低下。此问题已由 SPARK-34775 跟踪,并可能在未来得到解决 ( SPARK-37099 )。
普通 SQL 聚合后跟 join
:
或者,您可以加入聚合数据框:
val dfMax = df.groupBy($"hour".as("max_hour")).agg(max($"TotalValue").as("max_value"))
val dfTopByJoin = df.join(broadcast(dfMax),
($"hour" === $"max_hour") && ($"TotalValue" === $"max_value"))
.drop("max_hour")
.drop("max_value")
dfTopByJoin.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// | 0| cat26| 30.9|
// | 1| cat67| 28.5|
// | 2| cat56| 39.6|
// | 3| cat8| 35.6|
// +----+--------+----------+
它将保留重复值(如果每小时有多个类别具有相同的总值)。您可以按如下方式删除它们:
dfTopByJoin
.groupBy($"hour")
.agg(
first("category").alias("category"),
first("TotalValue").alias("TotalValue"))
对结构
使用排序:
虽然没有经过很好的测试,但不需要连接或窗口函数的巧妙技巧:
val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs"))
.groupBy($"hour")
.agg(max("vs").alias("vs"))
.select($"Hour", $"vs.Category", $"vs.TotalValue")
dfTop.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// | 0| cat26| 30.9|
// | 1| cat67| 28.5|
// | 2| cat56| 39.6|
// | 3| cat8| 35.6|
// +----+--------+----------+
使用 DataSet API(Spark 1.6+、2.0+):
Spark 1.6:
case class Record(Hour: Integer, Category: String, TotalValue: Double)
df.as[Record]
.groupBy($"hour")
.reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y)
.show
// +---+--------------+
// | _1| _2|
// +---+--------------+
// |[0]|[0,cat26,30.9]|
// |[1]|[1,cat67,28.5]|
// |[2]|[2,cat56,39.6]|
// |[3]| [3,cat8,35.6]|
// +---+--------------+
Spark 2.0 或更高版本:
df.as[Record]
.groupByKey(_.Hour)
.reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y)
后两种方法可以利用 map side combine 并且不需要完全洗牌,因此与窗口函数和连接相比,大多数时候应该表现出更好的性能。这些手杖也可以在 completed
输出模式下与 Structured Streaming 一起使用。
不要使用:
df.orderBy(...).groupBy(...).agg(first(...), ...)
它可能看起来有效(尤其是在 local
模式下)但它并不可靠(请参阅 SPARK-16207 , Tzach Zohar 和 linking relevant JIRA issue 归功于 SPARK-30335 )。
同样的注意事项适用于
df.orderBy(...).dropDuplicates(...)
内部使用等效的执行计划。
关于sql - 如何选择每组的第一行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46671378/
SQL、PL-SQL 和 T-SQL 之间有什么区别? 谁能解释一下这三者之间的区别,并提供每一个的相关使用场景? 最佳答案 SQL 是一种对集合进行操作的查询语言。 它或多或少是标准化的,几乎所有关
这个问题已经有答案了: What is the difference between SQL, PL-SQL and T-SQL? (6 个回答) 已关闭 9 年前。 我对 SQL 的了解足以完成我的
我在数据库中有一个 USER 表。该表有一个 RegistrationDate 列,该列有一个默认约束为 GETDATE()。 使用 LINQ 时,我没有为 RegistrationDate 列提供任
我有一个可能属于以下类型的字符串 string expected result 15-th-rp 15 15/12-rp 12 15-12-th
很难说出这里问的是什么。这个问题模棱两可、含糊不清、不完整、过于宽泛或言辞激烈,无法以目前的形式合理回答。如需帮助澄清此问题以便可以重新打开,visit the help center . 9年前关闭
我有一个存储过程(称为 sprocGetArticles),它从文章表中返回文章列表。这个存储过程没有任何参数。 用户可以对每篇文章发表评论,我将这些评论存储在由文章 ID 链接的评论表中。 有什么方
我目前正在做一个 *cough*Oracle*cough* 数据库主题。讲师介绍embedded SQL作为让其他语言(例如 C、C++)与(Oracle)数据库交互的方式。 我自己做了一些数据库工作
SQL Server 中 SQL 语句的最大长度是多少?这个长度是否取决于 SQL Server 的版本? 例如,在 DECLARE @SQLStatement NVARCHAR(MAX) = N'S
这个问题已经有答案了: Simple way to transpose columns and rows in SQL? (9 个回答) 已关闭 8 年前。 CallType
预先感谢您对此提供的任何帮助。 假设我有一个查询,可以比较跨年的数据,从某个任意年份开始,永无止境(进入 future ),每年同一时期直到最后一个完整的月份(其特点是一月数据永远不会显示至 2 月
我在数据库中有一个 USER 表。该表有一个 RegistrationDate 列,该列的默认约束为 GETDATE()。 使用 LINQ 时,我没有为 RegistrationDate 列提供任何数
下面是我试图用来检查存储过程是否不存在然后创建过程的 sql。它会抛出一个错误:Incorrect syntax near the keyword 'PROCEDURE' IF NOT EXISTS
我有一个同事声称动态 SQL 在许多情况下比静态 SQL 执行得更快,所以我经常看到 DSQL 到处都是。除了明显的缺点,比如在运行之前无法检测到错误并且更难阅读,这是否准确?当我问他为什么一直使用
来自 lobodava 的动态 SQL 查询是: declare @sql nvarchar(4000) = N';with cteColumnts (ORDINAL_POSITION, CO
使用 SQL Server 中的存储过程执行动态 SQL 命令的现实优点和缺点是什么 EXEC (@SQL) 对比 EXEC SP_EXECUTESQL @SQL ? 最佳答案 sp_executes
我有这个有效的 SQL 查询: select sum(dbos.Points) as Points, dboseasons.Year from dbo.StatLines dbos i
我正在调试一些构建成功运行的 SQL 命令的代码。 然而,在查询结束时,查询结果似乎被写入了一个文本文件。 完整的查询如下 echo SELECT DATE,DATETABLE,DATE,APPDAT
我有一些创建表的 .sql 文件(MS SQL 数据库): 表_1.sql: IF OBJECT_ID (N'my_schema.table1', N'U') IS NOT NULL DROP TAB
我写了下面的 SQL 存储过程,它一直给我错误@pid = SELECT MAX(... 整个过程是: Alter PROCEDURE insert_partyco @pname varchar(20
我在 SQL Server 2005 中有包含两列 Fruit 和 Color 的表,如下所示 Fruit Colour Apple Red Orange
我是一名优秀的程序员,十分优秀!