gpt4 book ai didi

scala - 如何用 csv 数据解析一个巨大的文件并在普通 Scala 中计算其中一列的平均值?

转载 作者:行者123 更新时间:2023-12-04 07:22:12 25 4
gpt4 key购买 nike

我正在尝试读取一个大文件(逗号分隔值中的数据)
输入文件包含以下格式的数百万行。该文件也接近8GB。
前任:

1, 22, Begin, session1
1, 33, End, session1
2, 20, Begin, session1
2, 30, End, session1
1, 30, Begin, session2
1, 50, End, session2
3, 90, Begin, session1
4, 10, Begin, session1
3, 100, End, session1
4, 20, End, session1
3, 200, OPEN, session2
第一个值是 RECORDID ,第二个值是它的 WEIGHT ,第三个值是 TRANSACTION_STATUS ,第四个值是 SESSIONID .
我必须计算所有 WEIGHT 的平均值每个 RECORDID在其 BEGIN 之间的所有 session 期间& END .
如果有没有 END 的 session ID ,应该被忽略。
示例输出:
RECORD ID => 1, WEIGHTS => (33-22) = 11, (50-30)=20 => Average 15.5 
RECORD ID => 2, WEIGHTS => (30-20) = 10 => Average 10.0
RECORD ID => 3, WEIGHTS => (100-90) = 10 => Average 10.0
RECORD ID => 4, WEIGHTS => (20-10) = 10 => Average 10.0
最终输出:
1, 15.5
2, 10.0
3, 10.0
4, 10.0
我开始像下面这样编码:
case class Users(recordid: Int, weight: Int, transaction_status: String, sessionid: String)
val userList = List[Users]()
val in = new BufferedReader(new InputStreamReader(new FileInputStream("/Users/Desktop/sessionfile.txt")))
Iterator continually in.readLine takeWhile (_ != null) foreach(println)
由于我的输入文件是一个大文件,我使用 InputStreamReader & Iterator 从文件中读取记录。但我在这里有点困惑,因为我之前在 Spark 数据集上进行了上述事件,在那里我创建了我的案例类的对象来表示类型为 Users => Dataset[Users] 的数据集和带有 spark SQL 的 Dataframe
在这种情况下,代码应该只用普通的 Scala 编写,而不使用任何 Spark 或 SQL 的实现。
任何人都可以让我知道在纯 Scala 代码中实现解决方案的有效方法是什么?任何帮助都非常感谢。

最佳答案

这是解决问题的一种方法。

val beginRE = raw"\s*(\d+)\s*,\s*(\d+)\s*,\s*Begin.*".r
val endRE = raw"\s*(\d+)\s*,\s*(\d+)\s*,\s*End.*".r

util.Using(io.Source.fromFile("./inFile.csv")){
_.getLines().foldLeft(Map[String,(Long,Int,Int)]()){
case (acc, beginRE(recID, wght)) => //Begin record
val (rt, cnt, _) = acc.getOrElse(recID,(0L,0,0))
acc + (recID -> (rt, cnt, wght.toInt))
case (acc, endRE(recID, wght)) => //End record
val (rt, cnt, bgn) = acc.getOrElse(recID,(0L,0,-1))
if (bgn < 0) {
println(s"orphan End record: '$recID,$wght,...'")
acc
} else
acc + (recID -> (rt + wght.toInt - bgn, cnt+1, -1))
case (acc, rec) => //bad record
println(s"bad record: $rec")
acc
}
}.map(_.map{case (k,(rt,cnt,_)) => k -> rt/cnt.toDouble})
//res0: Try[Map[String,Double]] =
// Success(Map(1 -> 15.5, 2 -> 10.0, 3 -> 10.0, 4 -> 10.0))
如果有多个 Begin只记录最后一项。其余的被忽略。
如果有多个 End只记录第一个计数。其余的被报告为“孤儿”。

关于scala - 如何用 csv 数据解析一个巨大的文件并在普通 Scala 中计算其中一列的平均值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68423543/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com