gpt4 book ai didi

scala - 在spark Scala的新行中添加两个日期之间的所有日期(周)

转载 作者:行者123 更新时间:2023-12-02 02:55:11 26 4
gpt4 key购买 nike

转换 Spark 数据帧

+----+---------+------+
|name|date |amount|
+----+---------+------+
|Jhon|4/6/2018 | 100 |
|Jhon|4/6/2018 | 200 |
+----+---------+------+
|Jhon|4/13/2018| 300|
+----+---------+------+
|Jhon|4/20/2018 | 500|
+----+---------+------+
|Lee |5/4/2018 | 100 |
+----+---------+------+
|Lee |4/4/2018 | 200 |
+----+---------+------+
|Lee |5/4/2018 | 300 |
+----+---------+------+
|Lee |4/11/2018 | 700|
+----+---------+------+

到预期的数据框:

+----+---------+------+
|name|date |amount|
+----+---------+------+
|Jhon|4/6/2018 | 100 |
|Jhon|4/6/2018 | 200 |
+----+---------+------+
|Jhon|4/13/2018| 100|
+----+---------+------+
|Jhon|4/13/2018| 200|
+----+---------+------+
|Jhon|4/13/2018| 300|
+----+---------+------+
|Jhon|4/20/2018 | 100 |
+----+---------+------+
|Jhon|4/20/2018 | 200|
+----+---------+------+
|Jhon|4/20/2018| 300|
+----+---------+------+
|Jhon|4/20/2018 | 500|
+----+---------+------+
|Lee |5/4/2018 | 100 |
+----+---------+------+
|Lee |5/4/2018 | 200 |
+----+---------+------+
|Lee |5/4/2018 | 300 |
+----+---------+------|
|Lee |5/11/2018 | 100|
+----+---------+------+
|Lee |4/11/2018 | 200|
+----+---------+------+
|Lee |5/11/2018 | 300|
+----+---------+------+
|Lee |4/11/2018 | 700|
+----+---------+------+

所以这里 300 是 04/13/2018 的新值,来自 04/06/2018 的 100,200 也将显示 04/13/2018 ,对于不同名称的下周五日期类似。我们有什么办法可以在 Spark Scala 中做到这一点吗? 任何帮助将不胜感激。

我的代码仅适用于姓名“John”和 foFridayfriday 日期 “4/6/2018”4/13/2018

def main(args: Array[String]){
val conf = new SparkConf().setAppName("Excel-read-write").setMaster("local")
val sc = new SparkContext(conf)
val sqlc = new org.apache.spark.sql.SQLContext(sc)
val ss = SparkSession.builder().master("local").appName("Excel-read-write").getOrCreate()
import ss.sqlContext.implicits._
var df1 = sqlc.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("oldRecords.csv")
df1.show(false)
println("---- df1 row count ----"+df1.count())
if(df1.count()>0){
for (i <- 0 until (df1.count().toInt)-1) {
var df2 = df1.unionAll(df1)//.union(df1)//df3
//df2.show(false)
var w1 = org.apache.spark.sql.expressions.Window.orderBy("date")
var df3 = df2.withColumn("previousAmount", lag("amount",1).over(w1)).withColumn("newdate", lag("date",1).over(w1))
// df3.show(false)
var df4 = df3.filter((df3.col("newdate").isNotNull))//(df3.col("new_date").isNotNull)
//df4.show(false)
var df5 = df4.select("name","amount","newdate").distinct()
println("-----------"+df5.show(false))
df1 = df5.withColumnRenamed("newdate", "date")
}
}
}

最佳答案

根据您的问题,如果您尝试将所有 week 添加到该 name 的最高日期。这是您可以执行的操作。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.joda.time.LocalDate
// input data
val dataDF = Seq(
("Jhon", "4/6/2018", 100),
("Jhon", "4/6/2018", 200),
("Jhon", "4/13/2018", 300),
("Jhon", "4/20/2018", 500),
("Lee", "5/4/2018", 100),
("Lee", "4/4/2018", 200),
("Lee", "5/4/2018", 300),
("Lee", "4/11/2018", 700)
).toDF("name", "date", "amount")
.withColumn("date", to_date($"date", "MM/dd/yyyy"))

val window = Window.partitionBy($"name")

//find the maximum date of each name
val df = dataDF.withColumn("maxDate", max($"date").over(window))

创建一个 UDF 以查找两周之间的所有周

val calculateDate = udf((min: String, max: String) => {
// to collect all the dates
val totalDates = scala.collection.mutable.MutableList[LocalDate]()
var start = LocalDate.parse(min)
val end = LocalDate.parse(max)
while ( {
!start.isAfter(end)
}) {
totalDates += start
start = start.plusWeeks(1)
}
totalDates.map(_.toString("MM/dd/yyyy"))
})

现在应用UDF分解UDF中获得的array

val finalDf = df.withColumn("date", explode(calculateDate($"date", $"maxDate")))
.drop("maxDate")

输出:

+----+----------+------+
|name|date |amount|
+----+----------+------+
|Jhon|04/06/2018|100 |
|Jhon|04/13/2018|100 |
|Jhon|04/20/2018|100 |
|Jhon|04/06/2018|200 |
|Jhon|04/13/2018|200 |
|Jhon|04/20/2018|200 |
|Jhon|04/13/2018|300 |
|Jhon|04/20/2018|300 |
|Jhon|04/20/2018|500 |
|Lee |05/04/2018|100 |
|Lee |04/04/2018|200 |
|Lee |04/11/2018|200 |
|Lee |04/18/2018|200 |
|Lee |04/25/2018|200 |
|Lee |05/02/2018|200 |
|Lee |05/04/2018|300 |
|Lee |04/11/2018|700 |
|Lee |04/18/2018|700 |
|Lee |04/25/2018|700 |
|Lee |05/02/2018|700 |
+----+----------+------+

希望对您有所帮助!

关于scala - 在spark Scala的新行中添加两个日期之间的所有日期(周),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49850804/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com