gpt4 book ai didi

scala - 直方图 - 以并行方式进行

转载 作者:行者123 更新时间:2023-12-02 02:33:02 43 4
gpt4 key购买 nike

+----+----+--------+
| Id | M1 | trx |
+----+----+--------+
| 1 | M1 | 11.35 |
| 2 | M1 | 3.4 |
| 3 | M1 | 10.45 |
| 2 | M1 | 3.95 |
| 3 | M1 | 20.95 |
| 2 | M2 | 25.55 |
| 1 | M2 | 9.95 |
| 2 | M2 | 11.95 |
| 1 | M2 | 9.65 |
| 1 | M2 | 14.54 |
+----+----+--------+

使用上面的数据框,我应该能够使用下面的代码生成如下的直方图。 Similar Queston is here

val (Range,counts) = df
.select(col("trx"))
.rdd.map(r => r.getDouble(0))
.histogram(10)
// Range: Array[Double] = Array(3.4, 5.615, 7.83, 10.045, 12.26, 14.475, 16.69, 18.905, 21.12, 23.335, 25.55)
// counts: Array[Long] = Array(2, 0, 2, 3, 0, 1, 0, 1, 0, 1)

但这里的问题是,如何根据“M1”列并行创建直方图?这意味着我需要为列值 M1 和 M2 提供两个直方图输出。

最佳答案

首先,您需要知道直方图生成两个单独的顺序作业。一种用于检测数据的最小值和最大值,另一种用于计算实际的直方图。您可以使用 Spark UI 进行检查。

我们可以按照相同的方案在您希望的任意数量的列上构建直方图,只需两项工作。然而,我们不能使用直方图函数,该函数仅用于处理一组 double 。需要我们自己去实现。第一份工作非常简单。

val Row(min_trx : Double, max_trx : Double) = df.select(min('trx), max('trx)).head

然后我们在本地计算直方图的范围。请注意,我对所有列使用相同的范围。它允许轻松比较列之间的结果(通过将它们绘制在同一图上)。不过,每列具有不同的范围只是对此代码的一个小修改。

val hist_size = 10
val hist_step = (max_trx - min_trx) / hist_size
val hist_ranges = (1 until hist_size)
.scanLeft(min_trx)((a, _) => a + hist_step) :+ max_trx
// I add max_trx manually to avoid rounding errors that would exclude the value

这是第一部分。然后,我们可以使用 UDF 来确定每个值的最终范围,并与 Spark 并行计算所有直方图。

val range_index = udf((x : Double) => hist_ranges.lastIndexWhere(x >= _))
val hist_df = df
.withColumn("rangeIndex", range_index('trx))
.groupBy("M1", "rangeIndex")
.count()
// And voilà, all the data you need is there.
hist_df.show()
+---+----------+-----+
| M1|rangeIndex|count|
+---+----------+-----+
| M2| 2| 2|
| M1| 0| 2|
| M2| 5| 1|
| M1| 3| 2|
| M2| 3| 1|
| M1| 7| 1|
| M2| 10| 1|
+---+----------+-----+

作为奖励,您可以使用 RDD API 或通过收集数据帧并在 scala 中修改它来调整数据以在本地(在驱动程序内)使用它。

这是使用 Spark 实现此目的的一种方法,因为这是一个关于 Spark 的问题;-)

val hist_map = hist_df.rdd
.map(row => row.getAs[String]("M1") ->
(row.getAs[Int]("rangeIndex"), row.getAs[Long]("count")))
.groupByKey
.mapValues( _.toMap)
.mapValues( hists => (1 to hist_size)
.map(i => hists.getOrElse(i, 0L)).toArray )
.collectAsMap

编辑:如何为每列值构建一个范围:

我们不是计算 M1 的最小值和最大值,而是使用 groupBy 计算列的每个值。

val min_max_map = df.groupBy("M1")
.agg(min('trx), max('trx))
.rdd.map(row => row.getAs[String]("M1") ->
(row.getAs[Double]("min(trx)"), row.getAs[Double]("max(trx)")))
.collectAsMap // maps each column value to a tuple (min, max)

然后我们调整 UDF 以便它使用此映射,我们就完成了。

// for clarity, let's define a function that generates histogram ranges
def generate_ranges(min_trx : Double, max_trx : Double, hist_size : Int) = {
val hist_step = (max_trx - min_trx) / hist_size
(1 until hist_size).scanLeft(min_trx)((a, _) => a + hist_step) :+ max_trx
}
// and use it to generate one range per column value
val range_map = min_max_map.keys
.map(key => key ->
generate_ranges(min_max_map(key)._1, min_max_map(key)._2, hist_size))
.toMap

val range_index = udf((x : Double, m1 : String) =>
range_map(m1).lastIndexWhere(x >= _))

最后,只需将 range_index('trx) 替换为 range_index('trx, 'M1),每列值就有一个范围。

关于scala - 直方图 - 以并行方式进行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59224388/

43 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com