gpt4 book ai didi

scala - Spark UDAF 与 ArrayType 作为 bufferSchema 性能问题

转载 作者:行者123 更新时间:2023-12-01 08:23:08 25 4
gpt4 key购买 nike

我正在研究一个返回元素数组的 UDAF。

每次更新的输入是索引和值的元组。

UDAF 所做的是将同一索引下的所有值相加。

例子:

对于输入(索引,值):(2,1),(3,1),(2,3)

应该返回 (0,0,4,1,...,0)

逻辑工作正常,但我的 更新方法 有问题,我的实现仅 为每行 更新 1 个单元格,但该方法中的最后一个分配实际上 复制了整个数组 - 这是冗余且非常耗时消耗。

仅此分配就负责 98% 的查询执行时间

我的问题是,我怎样才能减少那个时间?是否可以在缓冲区数组中分配 1 个值而不必替换整个缓冲区?

P.S.:我正在使用 Spark 1.6,我无法很快升级它,所以请坚持使用适用于此版本的解决方案。

class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{

val bucketSize = 1000

def inputSchema: StructType = StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)

def dataType: DataType = ArrayType(LongType)

def deterministic: Boolean = true

def bufferSchema: StructType = {
StructType(
StructField("buckets", ArrayType(LongType)) :: Nil
)
}

override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = new Array[Long](bucketSize)
}

override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val index = input.getLong(0)
val value = input.getLong(1)

val arr = buffer.getAs[mutable.WrappedArray[Long]](0)

buffer(0) = arr // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell)
}

override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0)
val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0)

for(i <- arr1.indices){
arr1.update(i, arr1(i) + arr2(i))
}

buffer1(0) = arr1
}

override def evaluate(buffer: Row): Any = {
buffer.getAs[mutable.WrappedArray[Long]](0)
}
}

最佳答案

TL;DR 要么不使用 UDAF 要么使用原始类型代替 ArrayType

没有 UserDefinedFunction

这两种解决方案都应该跳过内部和外部表示之间的昂贵杂耍。

使用标准聚合和 pivot

这使用标准 SQL 聚合。虽然在内部进行了优化,但当键的数量和数组的大小增加时,它可能会很昂贵。

给定输入:

val df = Seq((1, 2, 1), (1, 3, 1), (1, 2, 3)).toDF("id", "index", "value")

你可以:
import org.apache.spark.sql.functions.{array, coalesce, col, lit}

val nBuckets = 10
@transient val values = array(
0 until nBuckets map (c => coalesce(col(c.toString), lit(0))): _*
)

df
.groupBy("id")
.pivot("index", 0 until nBuckets)
.sum("value")
.select($"id", values.alias("values"))
+---+--------------------+                                                      
| id| values|
+---+--------------------+
| 1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+

通过 combineByKey/aggregateByKey 使用 RDD API。

带有可变缓冲区的普通旧 byKey 聚合。没有花里胡哨,但在广泛的输入范围内应该表现得相当好。如果您怀疑输入是稀疏的,您可以考虑更有效的中间表示,如可变 Map
rdd
.aggregateByKey(Array.fill(nBuckets)(0L))(
{ case (acc, (index, value)) => { acc(index) += value; acc }},
(acc1, acc2) => { for (i <- 0 until nBuckets) acc1(i) += acc2(i); acc1}
).toDF

+---+--------------------+
| _1| _2|
+---+--------------------+
| 1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+

UserDefinedFunction 与原始类型 一起使用

据我了解内部结构,性能瓶颈是 ArrayConverter.toCatalystImpl

看起来每次调用 MutableAggregationBuffer.update 都会调用它,然后为每个 GenericArrayData 分配新的 Row

如果我们将 bufferSchema 重新定义为:
def bufferSchema: StructType = {
StructType(
0 to nBuckets map (i => StructField(s"x$i", LongType))
)
}
updatemerge 都可以表示为缓冲区中原始值的简单替换。调用链会保持很长,但是 it won't require copies / conversions 和疯狂的分配。省略 null 检查,您将需要类似于
val index = input.getLong(0)
buffer.update(index, buffer.getLong(index) + input.getLong(1))


for(i <- 0 to nBuckets){
buffer1.update(i, buffer1.getLong(i) + buffer2.getLong(i))
}

分别。

最后 evaluate 应该采用 Row 并将其转换为输出 Seq :
 for (i <- 0 to nBuckets)  yield buffer.getLong(i)

请注意,在这个实现中,一个可能的瓶颈是 merge 。虽然它不应该引入任何新的性能问题,但对于 M 个存储桶,对 merge 的每次调用都是 O(M)。

使用 K 个唯一键和 P 个分区,在最坏的情况下,它将被调用 M * K 次,其中每个键在每个分区上至少出现一次。这有效地将 merge 组件的复杂性增加到 O(M * N * K)。

一般来说,您对此无能为力。但是,如果您对数据分布做出特定假设(数据稀疏, key 分布均匀),您可以稍微缩短一些事情,然后先洗牌:
df
.repartition(n, $"key")
.groupBy($"key")
.agg(SumArrayAtIndexUDAF($"index", $"value"))

如果满足假设,它应该:
  • 通过对稀疏对进行混洗而不是密集的类似数组的 Rows 来减少混洗大小。
  • 仅使用更新(每个 O(1))聚合数据,可能仅作为索引的子集接触。

  • 但是,如果一个或两个假设不满足,您可以预期 shuffle 大小会增加,而更新数量将保持不变。同时,数据偏差会使事情比 update - shuffle - merge 场景更糟。

    Aggregator 与“强”类型的 Dataset 一起使用:
    import org.apache.spark.sql.expressions.Aggregator
    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    import org.apache.spark.sql.{Encoder, Encoders}

    class SumArrayAtIndex[I](f: I => (Int, Long))(bucketSize: Int) extends Aggregator[I, Array[Long], Seq[Long]]
    with Serializable {
    def zero = Array.fill(bucketSize)(0L)
    def reduce(acc: Array[Long], x: I) = {
    val (i, v) = f(x)
    acc(i) += v
    acc
    }

    def merge(acc1: Array[Long], acc2: Array[Long]) = {
    for {
    i <- 0 until bucketSize
    } acc1(i) += acc2(i)
    acc1
    }

    def finish(acc: Array[Long]) = acc.toSeq

    def bufferEncoder: Encoder[Array[Long]] = Encoders.kryo[Array[Long]]
    def outputEncoder: Encoder[Seq[Long]] = ExpressionEncoder()
    }

    可以如下所示使用
    val ds = Seq((1, (1, 3L)), (1, (2, 5L)), (1, (0, 1L)), (1, (4, 6L))).toDS

    ds
    .groupByKey(_._1)
    .agg(new SumArrayAtIndex[(Int, (Int, Long))](_._2)(10).toColumn)
    .show(false)

    +-----+-------------------------------+
    |value|SumArrayAtIndex(scala.Tuple2) |
    +-----+-------------------------------+
    |1 |[1, 3, 5, 0, 6, 0, 0, 0, 0, 0] |
    |2 |[0, 11, 0, 0, 0, 0, 0, 0, 0, 0]|
    +-----+-------------------------------+

    注意 :

    另请参阅 SPARK-27296 - 用户定义的聚合函数 (UDAF) 存在主要的效率问题

    关于scala - Spark UDAF 与 ArrayType 作为 bufferSchema 性能问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47293454/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com