gpt4 book ai didi

apache-spark - Spark Sql Dedup 行

转载 作者:行者123 更新时间:2023-12-04 04:12:08 25 4
gpt4 key购买 nike

我们有一个常见的用例,即按行的创建顺序对表进行重复数据删除。

例如,我们有用户操作的事件日志。用户不时标记他最喜欢的类别。
在我们的分析阶段,我们只想知道用户最后喜欢的类别。

示例数据:

id  action_type value date 
123 fav_category 1 2016-02-01
123 fav_category 4 2016-02-02
123 fav_category 8 2016-02-03
123 fav_category 2 2016-02-04

我们只想根据日期列获取最近的更新。我们当然可以在 sql 中做到这一点:
select * from (
select *, row_number() over (
partition by id,action_type order by date desc) as rnum from tbl
)
where rnum=1;

但是,它不会在映射器端部分聚合,我们会将所有数据混洗到 reducer 。

我已经发布了一个关于这个问题的 Jira SPARK-17662它以更好的 SQL 风格建议结束:
select id,
action_type,
max(struct(date, *)) last_record
from tbl
group by id,action_type

虽然这个解决方案更干净,但仍然存在两个问题:
  • 如果其中一个字段不可排序(例如 map<>)
  • ,则此技巧不起作用
  • 如果稍后在流程中我们只选择了一些字段,我们将不会得到下推谓词来优化我们的流程并从一开始就忽略不需要的字段。

  • 我们最终为此编写了一个 UDAF,它克服了问题 #1,但仍然受到问题 #2 的困扰。

    有没有人有更好的解决方案的想法?

    最佳答案

    对于任何想要我们当前解决方案的人。这是 UDAF 的代码 - 请注意,我们必须使用一些内部函数,因此我们位于包 org.apache.spark.sql.types 中:

    package org.apache.spark.sql.types

    case class MaxValueByKey(child1: Expression, child2: Expression) extends DeclarativeAggregate {

    override def children: Seq[Expression] = child1 :: child2 :: Nil

    override def nullable: Boolean = true

    // Return data type.
    override def dataType: DataType = child2.dataType

    // Expected input data type.
    override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, AnyDataType)

    override def checkInputDataTypes(): TypeCheckResult =
    TypeUtils.checkForOrderingExpr(child1.dataType, "function max")

    private lazy val max = AttributeReference("max", child1.dataType)()
    private lazy val data = AttributeReference("data", child2.dataType)()

    override lazy val aggBufferAttributes: Seq[AttributeReference] = max :: data :: Nil

    override lazy val initialValues: Seq[Expression] = Seq(
    Literal.create(null, child1.dataType),
    Literal.create(null, child2.dataType)
    )

    override lazy val updateExpressions: Seq[Expression] =
    chooseKeyValue(max, data, child1, child2)

    override lazy val mergeExpressions: Seq[Expression] =
    chooseKeyValue(max.left, data.left, max.right, data.right)

    def chooseKeyValue(key1:Expression, value1: Expression, key2:Expression, value2: Expression) = Seq(
    If(IsNull(key1), key2, If(IsNull(key2), key1, If(GreaterThan(key1, key2), key1, key2))),
    If(IsNull(key1), value2, If(IsNull(key2), value1, If(GreaterThan(key1, key2), value1, value2)))
    )

    override lazy val evaluateExpression: AttributeReference = data
    }

    object SparkMoreUDAFs {
    def maxValueByKey(key: Column, value: Column): Column =
    Column(MaxValueByKey(key.expr, value.expr).toAggregateExpression(false))
    }

    用法是:
    sqlContext.table("tbl").groupBy($"id",$"action_type")
    .agg(SparkMoreUDAFs.maxValueByKey($"date", expr("struct(date,*)")).as("s"))

    我不确定它是否非常优雅,但它执行 map 端部分聚合并适用于所有列类型。此外,我认为这个 UDAF 本身也很有用。

    希望它会帮助某人..

    关于apache-spark - Spark Sql Dedup 行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41143001/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com