gpt4 book ai didi

json - 斯卡拉 : How to do GroupBy sum for String values?

转载 作者:行者123 更新时间:2023-12-04 15:31:47 24 4
gpt4 key购买 nike

我有 RDD[Row] :

  |---itemId----|----Country-------|---Type----------|
| 11 | US | Movie |
| 11 | US | TV |
| 101 | France | Movie |

如何执行 GroupBy itemId 以便我可以将结果保存为 json 列表,其中每一行都是单独的 json 对象(RDD 中的每一行):
{"itemId" : 11, 
"Country": {"US" :2 },"Type": {"Movie" :1 , "TV" : 1} },
{"itemId" : 101,
"Country": {"France" :1 },"Type": {"Movie" :1} }

研发:

我试过 :
import com.mapping.data.model.MappingUtils
import com.mapping.data.model.CountryInfo


val mappingPath = "s3://.../"
val input = sc.textFile(mappingPath)

输入是 jsons 列表,其中每一行都是 json,我使用 MappingUtils 将其映射到 POJO 类 CountryInfo,它负责 JSON 解析和转换:
val MappingsList = input.map(x=> {
val countryInfo = MappingUtils.getCountryInfoString(x);
(countryInfo.getItemId(), countryInfo)
}).collectAsMap

MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo]


def showCountryInfo(x: Option[CountryInfo]) = x match {
case Some(s) => s
}


val events = sqlContext.sql( "select itemId EventList")

val itemList = events.map(row => {
val itemId = row.getAs[String](1);
val çountryInfo = showTitleInfo(MappingsList.get(itemId));
val country = if (countryInfo.getCountry() == 'unknown)' "US" else countryInfo.getCountry()
val type = countryInfo.getType()

Row(itemId, country, type)
})

有人可以让我知道如何实现这一目标吗?

谢谢你!

最佳答案

我负担不起额外的时间来完成这个,但可以给你一个开始。

这个想法是你聚合 RDD[Row]分解成一个代表您的 JSON 结构的 Map。聚合是一个折叠,需要两个函数参数:

  • seqOp如何将元素集合折叠成目标类型
  • combOp如何合并两种目标类型。

  • 棘手的部分来自 combOp在合并时,因为您需要累积在 seqOp 中看到的值的计数.我把这个留作练习,因为我要赶飞机!如果您遇到麻烦,希望其他人可以填补空白。
      case class Row(id: Int, country: String, tpe: String)

    def foo: Unit = {

    val rows: RDD[Row] = ???

    def seqOp(acc: Map[Int, (Map[String, Int], Map[String, Int])], r: Row) = {
    acc.get(r.id) match {
    case None => acc.updated(r.id, (Map(r.country, 1), Map(r.tpe, 1)))
    case Some((countries, types)) =>
    val countries_ = countries.updated(r.country, countries.getOrElse(r.country, 0) + 1)
    val types_ = types.updated(r.tpe, types.getOrElse(r.tpe, 0) + 1)
    acc.updated(r.id, (countries_, types_))
    }
    }

    val z = Map.empty[Int, (Map[String, Int], Map[String, Int])]

    def combOp(l: Map[Int, (Map[String, Int], Map[String, Int])], r: Map[Int, (Map[String, Int], Map[String, Int])]) = {
    l.foldLeft(z) { case (acc, (id, (countries, types))) =>
    r.get(id) match {
    case None => acc.updated(id, (countries, types))
    case Some(otherCountries, otherTypes) =>
    // todo - continue by merging countries with otherCountries
    // and types with otherTypes, then update acc
    }
    }
    }

    val summaryMap = rows.aggregate(z) { seqOp, combOp }

    关于json - 斯卡拉 : How to do GroupBy sum for String values?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39454277/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com