gpt4 book ai didi

scala - 在Spark数据框内的映射中的结构数组上进行聚合

转载 作者:行者123 更新时间:2023-12-02 01:05:34 37 4
gpt4 key购买 nike

对于冗长的标题,我深表歉意,但我真的无法提出更好的建议。

基本上,我的数据具有以下架构:

 |-- id: string (nullable = true)
|-- mainkey: map (nullable = true)
| |-- key: string
| |-- value: array (valueContainsNull = true)
| | |-- element: struct (containsNull = true)
| | | |-- price: double (nullable = true)
| | | |-- recordtype: string (nullable = true)

让我使用以下示例数据:
{"id":1, "mainkey":{"key1":[{"price":0.01,"recordtype":"BID"}],"key2":[{"price":4.3,"recordtype":"FIXED"}],"key3":[{"price":2.0,"recordtype":"BID"}]}}
{"id":2, "mainkey":{"key4":[{"price":2.50,"recordtype":"BID"}],"key5":[{"price":2.4,"recordtype":"BID"}],"key6":[{"price":0.19,"recordtype":"BID"}]}}

对于上面的两个记录,我想计算记录类型为“BID”时所有价格的均值。因此,对于第一个记录(带有“id”:1),我们有2个这样的出价,价格分别为0.01和2.0,因此四舍五入到小数点后2位的均值为1.01。对于第二个记录(带有“id”:2),有3个出价,价格分别为2.5、2.4和0.19,平均值为1.70。所以我想要以下输出:
+---+---------+
| id|meanvalue|
+---+---------+
| 1| 1.01|
| 2| 1.7|
+---+---------+

下面的代码可以做到这一点:
val exSchema = (new StructType().add("id", StringType).add("mainkey", MapType(StringType, new ArrayType(new StructType().add("price", DoubleType).add("recordtype", StringType), true))))
val exJsonDf = spark.read.schema(exSchema).json("file:///data/json_example")
var explodeExJson = exJsonDf.select($"id",explode($"mainkey")).explode($"value") {
case Row(recordValue: Seq[Row] @unchecked ) => recordValue.map{ recordValue =>
val price = recordValue(0).asInstanceOf[Double]
val recordtype = recordValue(1).asInstanceOf[String]
RecordValue(price, recordtype)
}
}.cache()

val filteredExJson = explodeExJson.filter($"recordtype"==="BID")

val aggExJson = filteredExJson.groupBy("id").agg(round(mean("price"),2).alias("meanvalue"))

问题是它使用“昂贵”的爆炸操作,当我处理大量数据时,尤其是在 map 中可能有很多键时,这成为一个问题。

如果您能想到使用UDF或其他方法的简单解决方案,请告诉我。还请记住,我是Spark的初学者,因此可能错过了一些对您来说显而易见的内容。

任何帮助将非常感激。提前致谢!

最佳答案

如果聚合仅限于一个Row udf将解决此问题:

import org.apache.spark.util.StatCounter
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row

val meanPrice = udf((map: Map[String, Seq[Row]]) => {
val prices = map.values
.flatMap(x => x)
.filter(_.getAs[String]("recordtype") == "BID")
.map(_.getAs[Double]("price"))
StatCounter(prices).mean
})

df.select($"id", meanPrice($"mainkey"))

关于scala - 在Spark数据框内的映射中的结构数组上进行聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47686419/

37 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com