gpt4 book ai didi

scala - Spark 多个动态聚合函数,countDistinct 不起作用

转载 作者:行者123 更新时间:2023-12-01 12:07:01 25 4
gpt4 key购买 nike

使用多个动态聚合操作在 Spark 数据帧上聚合。

我想使用具有多个动态聚合操作(由用户在 JSON 中传递)的 Scala 对 Spark 数据帧进行聚合。我正在将 JSON 转换为 Map .

下面是一些示例数据:

colA    colB    colC    colD
1 2 3 4
5 6 7 8
9 10 11 12

我正在使用的 Spark 聚合代码:
var cols = ["colA","colB"]
var aggFuncMap = Map("colC"-> "sum", "colD"-> "countDistinct")
var aggregatedDF = currentDF.groupBy(cols.head, cols.tail: _*).agg(aggFuncMap)

我要通过 aggFuncMapMap仅,以便用户可以通过 JSON 配置传递任意数量的聚合。

上面的代码适用于一些聚合,包括 sum , min , max , avgcount .

但是,不幸的是,此代码不适用于 countDistinct (也许是因为它是 Camel 壳?)。

运行上述代码时,我收到此错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Undefined function: 'countdistinct'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'



任何帮助将不胜感激!

最佳答案

当前无法使用 aggcountDistinct里面Map .来自 documentation我们看:

The available aggregate methods are avg, max, min, sum, count.



一个可能的解决方法是更改​​ MapSeq[Column] ,
val cols = Seq("colA", "colB")
val aggFuncs = Seq(sum("colC"), countDistinct("colD"))
val df2 = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)

但是如果用户要在配置文件中指定聚合,那将没有多大帮助。

另一种方法是使用 expr ,此函数将评估一个字符串并返回一列。然而, expr不接受 "countDistinct" , 而是 "count(distinct(...))"需要使用。
这可以编码如下:
val aggFuncs = Seq("sum(colC)", "count(distinct(colD))").map(e => expr(e))
val df2 = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)

关于scala - Spark 多个动态聚合函数,countDistinct 不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55640686/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com