gpt4 book ai didi

scala - 使用 Spark 窗口函数计算移动平均值时丢弃前几个值

转载 作者:行者123 更新时间:2023-12-01 12:17:32 25 4
gpt4 key购买 nike

我正在尝试计算按名称分组的列的季度移动平均值,并且我已将 Spark 窗口函数规范定义为

val wSpec1 = Window.partitionBy("name").orderBy("date").rowsBetween(-2, 0)

我的 DataFrame 如下所示:

enter image description here

+-----+----------+-----------+------------------+
| name| date|amountSpent| movingAvg|
+-----+----------+-----------+------------------+
| Bob|2016-01-01| 25.0| 25.0|
| Bob|2016-02-02| 25.0| 25.0|
| Bob|2016-03-03| 25.0| 25.0|
| Bob|2016-04-04| 29.0|26.333333333333332|
| Bob|2016-05-06| 27.0| 27.0|
|Alice|2016-01-01| 50.0| 50.0|
|Alice|2016-02-03| 45.0| 47.5|
|Alice|2016-03-04| 55.0| 50.0|
|Alice|2016-04-05| 60.0|53.333333333333336|
|Alice|2016-05-06| 65.0| 60.0|
+-----+----------+-----------+------------------+

为每个名称组突出显示第一个准确计算的值。我想用一些字符串替换前两个值,比如 NULL。由于我对 Spark/Scala 的了解有限,我考虑过从 DataFrame 中提取此列并在 Scala 中使用 patch 函数。但是,我无法弄清楚如何像第二个名称组的开头那样每隔一段时间替换这些值。这是我的代码:

import com.datastax.spark.connector._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
object Test {

def main(args: Array[String]) {
//val sparkSession = SparkSession.builder.master("local").appName("Test").config("spark.cassandra.connection.host", "localhost").config("spark.driver.host", "localhost").getOrCreate()
val sparkSession = SparkSession.builder.master("local").appName("Test").config("spark.cassandra.connection.host", "localhost").config("spark.driver.host", "localhost").getOrCreate()
val sc = sparkSession.sparkContext

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sparkSession.implicits._

val customers = sc.parallelize(List(("Alice", "2016-01-01", 50.00),
("Alice", "2016-02-03", 45.00),
("Alice", "2016-03-04", 55.00),
("Alice", "2016-04-05", 60.00),
("Alice", "2016-05-06", 65.00),
("Bob", "2016-01-01", 25.00),
("Bob", "2016-02-02", 25.00),
("Bob", "2016-03-03", 25.00),
("Bob", "2016-04-04", 29.00),
("Bob", "2016-05-06", 27.00))).toDF("name", "date", "amountSpent")

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

// Create a window spec.
val wSpec1 = Window.partitionBy("name").orderBy("date").rowsBetween(-2, 0)

val ls=customers.withColumn("movingAvg",avg(customers("amountSpent")).over(wSpec1))
ls.show()

}
}

最佳答案

如果窗口正好包含 3 行(即跨越整个范围 -2 到 0),我建议只计算平均值

val ls=customers
.withColumn("count",count(($"amountSpent")).over(wSpec1))
.withColumn("movingAvg",when($"count"===3,avg(customers("amountSpent")).over(wSpec1)))

ls.show()


+-----+----------+-----------+-----+------------------+
| name| date|amountSpent|count| movingAvg|
+-----+----------+-----------+-----+------------------+
| Bob|2016-01-01| 25.0| 1| null|
| Bob|2016-02-02| 25.0| 2| null|
| Bob|2016-03-03| 25.0| 3| 25.0|
| Bob|2016-04-04| 29.0| 3|26.333333333333332|
| Bob|2016-05-06| 27.0| 3| 27.0|
|Alice|2016-01-01| 50.0| 1| null|
|Alice|2016-02-03| 45.0| 2| null|
|Alice|2016-03-04| 55.0| 3| 50.0|
|Alice|2016-04-05| 60.0| 3|53.333333333333336|
|Alice|2016-05-06| 65.0| 3| 60.0|
+-----+----------+-----------+-----+------------------+

关于scala - 使用 Spark 窗口函数计算移动平均值时丢弃前几个值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47052723/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com