gpt4 book ai didi

scala - 如何从scala中的RDD中获取最早的时间戳日期

转载 作者:行者123 更新时间:2023-12-03 23:27:23 24 4
gpt4 key购买 nike

我有一个类似于 ((String, String), TimeStamp) 的 RDD。我有大量记录,我想为每个键选择 具有最新 TimeStamp 值的记录。我已经尝试了以下代码,但仍在努力解决这个问题。有人可以帮我做这个吗?

我尝试的以下代码是错误的,并且不能正常工作

val context = sparkSession.read.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", url)
.option("dbtable", "student_risk")
.option("user", "user")
.option("password", "password")
.load()
context.cache();

val studentRDD = context.rdd.map(r => ((r.getString(r.fieldIndex("course_id")), r.getString(r.fieldIndex("student_id"))), r.getTimestamp(r.fieldIndex("risk_date_time"))))
val filteredRDD = studentRDD.collect().map(z => (z._1, z._2)).reduce((x, y) => (x._2.compareTo(y._2)))

最佳答案

直接在DataFrame上很容易做到(这里奇怪地命名为context):

val result = context
.groupBy("course_id", "student_id")
.agg(min("risk_date_time") as "risk_date_time")

然后您可以像以前一样将其转换为 RDD(如果需要) - 结果具有相同的架构。

如果你确实想在 RDD 上执行此操作,请使用 reduceByKey:

studentRDD.reduceByKey((t1, t2) => if (t1.before(t2)) t1 else t2)

关于scala - 如何从scala中的RDD中获取最早的时间戳日期,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42268827/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com