- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个数据框
| id | date | KPI_1 | ... | KPI_n
| 1 |2012-12-12 | 0.1 | ... | 0.5
| 2 |2012-12-12 | 0.2 | ... | 0.4
| 3 |2012-12-12 | 0.66 | ... | 0.66
| 1 |2012-12-13 | 0.2 | ... | 0.46
| 4 |2012-12-14 | 0.2 | ... | 0.45
| ...
| 55| 2013-03-15 | 0.5 | ... | 0.55
| id | date | KPI_1 | ... | KPI_n | KPI_1_diff | KPI_n_diff
| 1 |2012-12-12 | 0.1 | ... | 0.5 | 0.1 | 0.5
| 2 |2012-12-12 | 0.2 | ... | 0.4 | 0.2 |0.4
| 3 |2012-12-12 | 0.66 | ... | 0.66 | 0.66 | 0.66
| 1 |2012-12-13 | 0.2 | ... | 0.46 | 0.2-0.1 | 0.46 - 0.66
| 4 |2012-12-13 | 0.2 | ... | 0.45 ...
| ...
| 55| 2013-03-15 | 0.5 | ... | 0.55
val groupedDF = myDF.groupBy("id").agg(
collect_list(struct(col("date",col("KPI_1"))).as("wrapped_KPI_1"),
collect_list(struct(col("date",col("KPI_2"))).as("wrapped_KPI_2")
// up until nth KPI
)
[("2012-12-12",0.1),("2012-12-12",0.2) ...
val window = Window.partitionBy(col("id")).orderBy(col("date")).rowsBetween(Window.unboundedPreceding,0L)
val windowedDF = df.select (
col("id"),
col("date"),
col("KPI_1"),
collect_list(struct(col("date"),col("KPI_1"))).over(window),
collect_list(struct(col("date"),col("KPI_2"))).over(window)
)
[("2012-12-12",0.1)]
[("2012-12-12",0.1), ("2012-12-13",0.1)]
...
最佳答案
我相信窗口方法应该是一个更好的解决方案,但在使用窗口函数之前,您应该根据 id 重新分区数据帧。这只会对数据进行一次混洗,并且所有窗口函数都应该使用已经混洗的数据帧执行。我希望它有帮助。
代码应该是这样的。
val windowedDF = df.repartition(col("id"))
.select (
col("id"),
col("date"),
col("KPI_1"),
col("KPI_2"),
collect_list(struct(col("date"),col("KPI_1"))).over(window),
collect_list(struct(col("date"),col("KPI_2"))).over(window)
)
val list = Seq(( "2", null, 1, 11, 1, 1 ),
( "2", null, 1, 22, 2, 2 ),
( "2", null, 1, 11, 1, 3 ),
( "2", null, 1, 22, 2, 1 ),
( "2", null, 1, 33, 1, 2 ),
( null, "3", 3, 33, 1, 2 ),
( null, "3", 3, 33, 2, 3 ),
( null, "3", 3, 11, 1, 1 ),
( null, "3", 3, 22, 2, 2 ),
( null, "3", 3, 11, 1, 3 )
)
val df = spark.sparkContext.parallelize(list).toDF("c1","c2","batchDate","id", "pv" , "vv")
val c1Window = Window.partitionBy("batchDate", "c1")
val c2Window = Window.partitionBy("batchDate", "c2")
val agg1df = df.withColumn("c1List",collect_list("pv").over(c1Window))
.withColumn("c2List", collect_list("pv").over(c2Window))
val agg2df = df.repartition($"batchDate")
.withColumn("c1List",collect_list("pv").over(c1Window))
.withColumn("c2List", collect_list("pv").over(c2Window))
agg1df.explain()
== Physical Plan ==
Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c2#15, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c2List#38], [batchDate#16, c2#15]
+- *Sort [batchDate#16 ASC NULLS FIRST, c2#15 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(batchDate#16, c2#15, 1)
+- Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c1#14, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c1List#28], [batchDate#16, c1#14]
+- *Sort [batchDate#16 ASC NULLS FIRST, c1#14 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(batchDate#16, c1#14, 1)
+- *Project [_1#7 AS c1#14, _2#8 AS c2#15, _3#9 AS batchDate#16, _4#10 AS id#17, _5#11 AS pv#18, _6#12 AS vv#19]
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._1, true) AS _1#7, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._2, true) AS _2#8, assertnotnull(input[0, scala.Tuple6, true])._3 AS _3#9, assertnotnull(input[0, scala.Tuple6, true])._4 AS _4#10, assertnotnull(input[0, scala.Tuple6, true])._5 AS _5#11, assertnotnull(input[0, scala.Tuple6, true])._6 AS _6#12]
+- Scan ExternalRDDScan[obj#6]
agg2df.explain()
== Physical Plan ==
Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c2#15, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c2List#60], [batchDate#16, c2#15]
+- *Sort [batchDate#16 ASC NULLS FIRST, c2#15 ASC NULLS FIRST], false, 0
+- Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c1#14, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c1List#50], [batchDate#16, c1#14]
+- *Sort [batchDate#16 ASC NULLS FIRST, c1#14 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(batchDate#16, 1)
+- *Project [_1#7 AS c1#14, _2#8 AS c2#15, _3#9 AS batchDate#16, _4#10 AS id#17, _5#11 AS pv#18, _6#12 AS vv#19]
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._1, true) AS _1#7, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._2, true) AS _2#8, assertnotnull(input[0, scala.Tuple6, true])._3 AS _3#9, assertnotnull(input[0, scala.Tuple6, true])._4 AS _4#10, assertnotnull(input[0, scala.Tuple6, true])._5 AS _5#11, assertnotnull(input[0, scala.Tuple6, true])._6 AS _6#12]
+- Scan ExternalRDDScan[obj#6]
关于apache-spark - spark 窗口函数 VS group by 性能问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54332942/
我想编写一个 linq 表达式,该表达式将返回不包含特定值的 ID。例如,我想返回所有不具有 Value = 30 的不同 ID。 ID, Value 1, 10 1, 20 1, 30 2,
我正在尝试使用 Regexp 匹配 Nmap 命令的输出。可以有两种不同的格式。 第一种格式(当 nmap 可以找到主机名时) Nmap scan report for 2u4n32t-n4 (192
我正在 Visual Studio 2012 上使用 C# 开发一个软件。我使用 MySQL Connector 6.9.1 进行 MySQL 连接。我的软件在我的操作系统(Win8 x64)上运行顺
在 Django 中(使用 django.contrib.auth 时)我可以添加一个 Group到另一个 Group ?即一个Group成为另一个成员(member) Group ? 如果是这样,我
我试图通过使用动态组参数对数据进行分组来循环。 我们可以在循环的 WHERE 条件上使用动态查询,但我不知道是否可以在组条件中使用动态字符串。 以下是用户决定按哪个字段分组,然后根据决定放置其他逻辑的
我有这样的字符串 s = 'MR1|L2-S1x' 模式总是相同的:一个或两个字符,在 [|.+:x-] 中可选地后跟一个数字和一个分隔符。此模式可以重复 6 次。 所以匹配模式很明确。 p = r'
我有一个带有时间戳字段“bar”的表“foo”。如何仅获取查询的最旧时间戳,例如: SELECT foo.bar from foo?我尝试执行以下操作: SELECT MIN(foo.bar) fro
在我的 Django 项目中,我有一个 user_manage 应用程序。 我在 user_manage 应用的 model.py 中创建了一个名为 UserManage 的模型: from djan
所以我有这样的输入: 还有一个模板指令,例如: 看来我只获得了 foo 和 bar 的组。 (为什么?我预计我可能会得到第三组 current-group-key() = '')。
我正在尝试扩展 django.contrib.auth 并遇到将用户添加到组中的情况,这可以通过两种方式完成。我只是想知道为什么会这样,以及其中一种相对于另一种的优势是什么。 最佳答案 他们做完全相同
我使用的是旧的 PHP 脚本,并且此查询有错误。由于我没有使用 mysql 的经验,因此无法修复它。 "SELECT COUNT(p.postid) AS pid, p.*, t.* FROM ".T
我有几行 Objective-C 代码,例如: ABAddressBookRef addressBook; CFErrorRef error = NULL; addressBook = ABAddre
我正在使用 MariaDB IMDB 电影数据集,我试图解决以下问题。电影表包含 id、名称、排名和年份列 A decade is a sequence of 10 consecutive years
让我从数据开始,以便更好地描述我的需求。我有一个名为 SUPERMARKET 的表,其中包含以下字段: Field 1: StoreID Field 2: ProductCategory Field
你好我有这个查询: SELECT DISTINCT a.id, a.runcd, (SELECT SUM(b.CALVAL) FROM GRS b WHERE b.PCode=11000 AND a.
我想在 xquery 中使用 Group By。有人可以告诉我如何在 Marklogic 中使用 Group By 吗? 最佳答案 或者,您可以使用 xdmp:xslt-invoke 调用 XSLT或
因此,当通过 from sequelize 请求组时,如下所示: return models.WorkingCalendar .findAll({
我希望我解释正确。 我有 2 个表,有 第一个表(table1) +------------+------+-------+-------+ | Date | Item | Block |
我的表 MYTABLE 有 2 列:A 和 B 我有以下代码片段: SELECT MYTABLE.A FROM MYTABLE HAVING SUM(MYTABLE.B) > 100
我有一个简单的行分组查询,需要 0.0045 秒。 300.000 行 从表 GROUP BY cid 中选择 cid 当我添加 MAX() 进行查询时,需要 0.65 秒才能返回。 从表 GROUP
我是一名优秀的程序员,十分优秀!