gpt4 book ai didi

scala - Spark 中的数据操作

转载 作者:行者123 更新时间:2023-12-01 10:19:56 26 4
gpt4 key购买 nike

我是 spark 的新手,我正在尝试根据计数实现某种数据操作 - 问题是这样的 - 我有一个包含如下信息的文本文件 -

john, apple 
john, apple
john, orange
jill, apple
jill, orange
jill, orange

我想做的很简单——我想计算每个人每个水果出现的次数,然后用这个数字除以两人水果的总数——所以结果看起来像这样:

john, apple, 2, 3
jill, apple, 1, 3
john, orange, 1, 3
jill orange, 2, 3

然后我可以将第 3 行除以第 4 行以获得最终产品 -

john, apple, 2, 3, 2/3
jill, apple, 1, 3, 1/3
john, orange, 1, 3, 1/3
jill orange, 2, 3, 2/3

我在 scala 中尝试过一些像这样的东西 -

var persons = sc.textFile("path_to_directory").map(_.split(",")).map(x=>(x(0),x(1)))
persons.map{case(person, fruit)=>((person, fruit), 1)}.reduceByKey(_+_).collect

此输出提供 -

((jill,orange),2)
((jill,apple),1)
((john,orange),1)
((john,apple),2)

这似乎是一个好的开始,但我不知道如何从这里开始。任何帮助或提示将不胜感激!

更新:

我有一个针对这个问题的建议解决方案 -

var persons = sc.textFile("path_to_directory").map(_.split(",")).map(x=>(x(0),x(1)))

var count = persons.map{case(name, fruit)=>((name,fruit),1)}.reduceByKey(_+_)

var total = persons.map{case(name, fruit)=>(fruit,1)}.reduceByKey(_+_)

var fruit = count.map{case((name, fruit), count)=>(fruit, (name, count))}

fruit.join(total).map{case((fruit,((name, count), total)))=>(name, fruit, count, total, count.toDouble/total.toDouble)}.collect.foreach(println)

这个 scala 代码在 spark 中的输出是 -

(jill,orange,2,3,0.6666666666666666)
(john,orange,1,3,0.3333333333333333)
(jill,apple,1,3,0.3333333333333333)
(john,apple,2,3,0.6666666666666666)

最佳答案

一种可能的解决方案:

def getFreqs(x: String, vals: Iterable[String]) = {
val counts = vals.groupBy(identity).mapValues(_.size)
val sum = counts.values.sum.toDouble
counts.map { case (k, v) => (x, k, v, sum.toInt, v / sum) }
}

persons.groupByKey.flatMap { case(k, v) => getFreqs(k, v) }

还有一个:

val fruitsPerPerson = sc.broadcast(persons.countByKey)

persons.groupBy(identity).map { case (k, v) => {
val sum: Float = fruitsPerPerson.value.get(k._1) match {
case Some(x) => x
case _ => 1
}
(k._1, k._2, v.size, sum.toInt, v.size / sum)
}}

groupByKeygroupBy 都可能效率很低,因此如果您正在寻找更强大的解决方案,您可以考虑使用 combineByKey:

def create(value: String) = Map(value -> 1)

def mergeVals(x: Map[String, Int], value: String) = {
val count = x.getOrElse(value, 0) + 1
x ++ Map(value -> count)
}

def mergeCombs(x: Map[String, Int], y: Map[String, Int]) = {
val keys = x.keys ++ y.keys
keys.map((k: String) => (k -> (x.getOrElse(k, 0) + y.getOrElse(k, 0)))).toMap
}

val counts = persons.combineByKey(create, mergeVals, mergeCombs)

counts.flatMap { case (x: String, counts: Map[String, Int]) => {
val sum = counts.values.sum.toDouble
counts.map { case (k: String, v: Int) => (x, k, v, sum.toInt, v / sum) }
}}

关于scala - Spark 中的数据操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31035547/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com