gpt4 book ai didi

scala - 如何在带有 Spark 的 Scala 中使用 countDistinct?

转载 作者:行者123 更新时间:2023-12-03 15:51:19 27 4
gpt4 key购买 nike

根据 DataBrick's blog,我尝试使用应该在 Spark 1.5 中可用的 countDistinct 函数.但是,我得到以下异常:

Exception in thread "main" org.apache.spark.sql.AnalysisException: undefined function countDistinct;

我在 Spark developers' mail list 上发现了这一点他们建议使用 count 和 distinct 函数来获得应由 countDistinct 产生的相同结果:
count(distinct <columnName>)
// Instead
countDistinct(<columnName>)

因为我从聚合函数的名称列表动态构建聚合表达式,所以我希望没有任何需要不同处理的特殊情况。

那么,是否可以通过以下方式统一它:
  • 注册新的 UDAF,它将成为 count(distinct columnName) 的别名
  • 手动注册已经在 Spark CountDistinct 函数中实现,这可能是以下导入中的一个:

    导入 org.apache.spark.sql.catalyst.expressions.{CountDistinctFunction, CountDistinct}
  • 或以任何其他方式做到这一点?

  • 编辑:
    示例(删除了一些本地引用和不必要的代码):
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.{Column, SQLContext, DataFrame}
    import org.apache.spark.sql.functions._

    import scala.collection.mutable.ListBuffer


    class Flattener(sc: SparkContext) {
    val sqlContext = new SQLContext(sc)

    def flatTable(data: DataFrame, groupField: String): DataFrame = {
    val flatteningExpressions = data.columns.zip(TypeRecognizer.getTypes(data)).
    flatMap(x => getFlatteningExpressions(x._1, x._2)).toList

    data.groupBy(groupField).agg (
    expr(s"count($groupField) as groupSize"),
    flatteningExpressions:_*
    )
    }

    private def getFlatteningExpressions(fieldName: String, fieldType: DType): List[Column] = {
    val aggFuncs = getAggregationFunctons(fieldType)

    aggFuncs.map(f => expr(s"$f($fieldName) as ${fieldName}_$f"))
    }

    private def getAggregationFunctons(fieldType: DType): List[String] = {
    val aggFuncs = new ListBuffer[String]()

    if(fieldType == DType.NUMERIC) {
    aggFuncs += ("avg", "min", "max")
    }

    if(fieldType == DType.CATEGORY) {
    aggFuncs += "countDistinct"
    }

    aggFuncs.toList
    }

    }

    最佳答案

    countDistinct 可以以两种不同的形式使用:

    df.groupBy("A").agg(expr("count(distinct B)")

    或者
    df.groupBy("A").agg(countDistinct("B"))

    但是,当您希望将它们与自定义 UDAF(在 Spark 1.5 中作为 UserDefinedAggregateFunction 实现)在同一列上使用它们时,这两种方法都不起作用:
    // Assume that we have already implemented and registered StdDev UDAF 
    df.groupBy("A").agg(countDistinct("B"), expr("StdDev(B)"))

    // Will cause
    Exception in thread "main" org.apache.spark.sql.AnalysisException: StdDev is implemented based on the new Aggregate Function interface and it cannot be used with functions implemented based on the old Aggregate Function interface.;

    由于这些限制,看起来最合理的是将 countDistinct 实现为 UDAF,它应该允许以相同的方式处理所有函数以及将 countDistinct 与其他 UDAF 一起使用。

    示例实现可能如下所示:
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    import org.apache.spark.sql.types._

    class CountDistinct extends UserDefinedAggregateFunction{
    override def inputSchema: StructType = StructType(StructField("value", StringType) :: Nil)

    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = (buffer.getSeq[String](0).toSet + input.getString(0)).toSeq
    }

    override def bufferSchema: StructType = StructType(
    StructField("items", ArrayType(StringType, true)) :: Nil
    )

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = (buffer1.getSeq[String](0).toSet ++ buffer2.getSeq[String](0).toSet).toSeq
    }

    override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = Seq[String]()
    }

    override def deterministic: Boolean = true

    override def evaluate(buffer: Row): Any = {
    buffer.getSeq[String](0).length
    }

    override def dataType: DataType = IntegerType
    }

    关于scala - 如何在带有 Spark 的 Scala 中使用 countDistinct?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33500816/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com