gpt4 book ai didi

Scala-Spark 用参数值动态调用 groupby 和 agg

转载 作者:行者123 更新时间:2023-12-02 00:40:31 24 4
gpt4 key购买 nike

我想编写一个自定义分组和聚合函数来获取用户指定的列名称和用户指定的聚合映射。我事先不知道列名称和聚合映射。我想编写一个类似于下面的函数。但我是 Scala 新手,无法解决它。

def groupAndAggregate(df: DataFrame,  aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
val grouped = df.groupBy(cols)
val aggregated = grouped.agg(aggregateFun)
aggregated.show()
}

并想这样调用它

val listOfStrings =  List("A", "B", "C")
val result = groupAndAggregate(df, Map("D"-> "SUM", "E"-> "COUNT"), listOfStrings)

我该怎么做?任何人都可以帮助我吗?

最佳答案

您的代码几乎是正确的 - 有两个问题:

  1. 函数的返回类型是 DataFrame,但最后一行是 aggreated.show(),它返回 Unit 。删除对 show 的调用以返回 aggreated 本身,或者直接返回 agg 的结果

  2. DataFrame.groupBy 期望参数如下:col1: String, cols: String* - 因此您需要传递匹配的参数:第一列,以及然后将其余列作为参数列表,您可以按如下方式执行此操作:df.groupBy(cols.head, cols.tail: _*)

总而言之,您的功能将是:

def groupAndAggregate(df: DataFrame,  aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
val grouped = df.groupBy(cols.head, cols.tail: _*)
val aggregated = grouped.agg(aggregateFun)
aggregated
}

或者,类似的较短版本:

def groupAndAggregate(df: DataFrame,  aggregateFun: Map[String, String], cols: List[String] ): DataFrame = {
df.groupBy(cols.head, cols.tail: _*).agg(aggregateFun)
}

如果您确实想要在函数中调用show:

def groupAndAggregate(df: DataFrame,  aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
val grouped = df.groupBy(cols.head, cols.tail: _*)
val aggregated = grouped.agg(aggregateFun)
aggregated.show()
aggregated
}

关于Scala-Spark 用参数值动态调用 groupby 和 agg,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36307867/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com