gpt4 book ai didi

scala - 如何根据 spark 数据框中值的累计和为每一行分配一个类别?

转载 作者:行者123 更新时间:2023-12-04 17:15:20 24 4
gpt4 key购买 nike

我有一个 spark 数据框,其中包含两列 [Employee 和 Salary],其中薪水按升序排列。

示例数据框

Expected Output: 
| Employee |salary |
| -------- | ------|
| Emp1 | 10 |
| Emp2 | 20 |
| Emp3 | 30 |
| EMp4 | 35 |
| Emp5 | 36 |
| Emp6 | 50 |
| Emp7 | 70 |

我想对行进行分组,使每个组的聚合值少于 80,并为每个组分配一个类别,如下所示。我会不断地逐行添加薪水,直到总和超过 80。一旦超过 80,我就会分配一个新类别。

Expected Output: 
| Employee |salary | Category|
| -------- | ------|----------
| Emp1 | 10 |A |
| Emp2 | 20 |A |
| Emp3 | 30 |A |
| EMp4 | 35 |B |
| Emp5 | 36 |B |
| Emp6 | 50 |C |
| Emp7 | 70 |D |

有没有我们可以在 spark scala 中执行此操作的简单方法?

最佳答案

要解决您的问题,您可以使用自定义 aggregate functionwindow

首先,您需要创建自定义聚合函数。聚合函数由累加器(缓冲区)定义,它将被初始化(值)并在处理新行时更新(reduce > 函数)或遇到另一个累加器(merge 函数)。最后,返回累加器(finish 函数)

在您的情况下,累加器应保留两条信息:

  • 当前员工类别
  • 属于当前类别的以前员工的工资总和

要存储这些信息,您可以使用元组 (Int, Int),第一个元素是当前类别,第二个元素是当前类别以前雇员的工资总和:

  • 您使用 (0, 0) 初始化此元组。
  • 当你遇到一个新行时,如果以前的薪水和当前行的薪水之和超过 80,你增加类别并用当前行的薪水重新初始化以前的薪水,否则你将当前行的薪水添加到以前的薪水'总和。
  • 由于您将使用窗口函数,因此您将按顺序处理行,因此您不需要实现与另一个累加器的合并。
  • 最后,由于您只需要类别,因此您只返回累加器的第一个元素。

因此我们得到以下聚合器实现:

import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator

object Labeler extends Aggregator[Int, (Int, Int), Int] {
override def zero: (Int, Int) = (0, 0)

override def reduce(catAndSum: (Int, Int), salary: Int): (Int, Int) = {
if (catAndSum._2 + salary > 80)
(catAndSum._1 + 1, salary)
else
(catAndSum._1, catAndSum._2 + salary)
}

override def merge(catAndSum1: (Int, Int), catAndSum2: (Int, Int)): (Int, Int) = {
throw new NotImplementedError("should be used only over a windows function")
}

override def finish(catAndSum: (Int, Int)): Int = catAndSum._1

override def bufferEncoder: Encoder[(Int, Int)] = Encoders.tuple(Encoders.scalaInt, Encoders.scalaInt)

override def outputEncoder: Encoder[Int] = Encoders.scalaInt
}

一旦有了聚合器,就可以使用 udaf 函数将其转换为 spark 聚合函数。

然后您在所有数据框上创建您的窗口并按薪水排序,并在此窗口上应用您的 spark 聚合函数:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, udaf}

val labeler = udaf(Labeler)
val window = Window.orderBy("salary")

val result = dataframe.withColumn("category", labeler(col("salary")).over(window))

使用您的示例作为输入数据框,您将获得以下结果数据框:

+--------+------+--------+
|employee|salary|category|
+--------+------+--------+
|Emp1 |10 |0 |
|Emp2 |20 |0 |
|Emp3 |30 |0 |
|Emp4 |35 |1 |
|Emp5 |36 |1 |
|Emp6 |50 |2 |
|Emp7 |70 |3 |
+--------+------+--------+

关于scala - 如何根据 spark 数据框中值的累计和为每一行分配一个类别?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68799179/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com