gpt4 book ai didi

apache-spark - 为什么 BigDecimal 的 Spark groupBy.agg(min/max) 总是返回 0?

转载 作者:行者123 更新时间:2023-12-04 05:12:15 25 4
gpt4 key购买 nike

我正在尝试按 DataFrame 的一列分组,并生成 minmax每个结果组中 BigDecimal 列的值。结果总是产生一个非常小的(大约 0)值。

(类似 min/max 对 Double 列的调用会产生预期的非零值。)

作为一个简单的例子:

如果我创建以下数据帧:

import org.apache.spark.sql.{functions => f}

case class Foo(group: String, bd_value: BigDecimal, d_value: Double)

val rdd = spark.sparkContext.parallelize(Seq(
Foo("A", BigDecimal("1.0"), 1.0),
Foo("B", BigDecimal("10.0"), 10.0),
Foo("B", BigDecimal("1.0"), 1.0),
Foo("C", BigDecimal("10.0"), 10.0),
Foo("C", BigDecimal("10.0"), 10.0),
Foo("C", BigDecimal("10.0"), 10.0)
))

val df = rdd.toDF()

选择 max Double 或 BigDecimal 列返回预期结果:

df.select(f.max("d_value")).show()

// +------------+
// |max(d_value)|
// +------------+
// | 10.0|
// +------------+

df.select(f.max("bd_value")).show()

// +--------------------+
// | max(bd_value)|
// +--------------------+
// |10.00000000000000...|
// +--------------------+

但是如果我分组然后聚合,我会得到 Double 列的合理结果,但 BigDecimal 列的值接近零:

df.groupBy("group").agg(f.max("d_value")).show()

// +-----+------------+
// |group|max(d_value)|
// +-----+------------+
// | B| 10.0|
// | C| 10.0|
// | A| 1.0|
// +-----+------------+

df.groupBy("group").agg(f.max("bd_value")).show()

// +-----+-------------+
// |group|max(bd_value)|
// +-----+-------------+
// | B| 1.00E-16|
// | C| 1.00E-16|
// | A| 1.0E-17|
// +-----+-------------+

为什么 spark 对于这些 min/max 返回零结果电话?

最佳答案

TL; 博士

Spark 如何处理 BigDecimals 在问题中显示的特定情况下表现出来的规模似乎不一致。代码的行为就像是使用 BigDecimal 对象的比例将 Long s 转换为未缩放的 BigDecimal s,然后使用模式的比例转换回 BigDecimal

这可以通过以下任一方式解决

  • 使用 BigDecimal
  • 显式设置所有 setScale 值的比例以匹配 DataFrame 的架构
  • 手动指定模式并从 RDD[Row] 创建 DF

  • 长版

    这是我认为在我的带有 Spark 2.4.0 的机器上发生的事情。

    groupBy.max 的情况下,Spark 正在通过 UnsafeRow 并将 BigDecimal 转换为未缩放的 Long 并将其作为 Byte 数组存储在 this 行的 setDecimal 中(通过打印语句验证)。然后,当它稍后调用 getDecimal 时,它使用模式中指定的比例将字节数组转换回 BigDecimal

    如果原始值中的比例与架构中的比例不匹配,则会导致值不正确。例如,
    val foo = BigDecimal(123456)
    foo.scale
    0

    val bytes = foo.underlying().unscaledValue().toByteArray()

    // convert the bytes into BigDecimal using the original scale -- correct value
    val sameValue = BigDecimal(new java.math.BigInteger(bytes), 0)
    sameValue: scala.math.BigDecimal = 123456

    // convert the bytes into BigDecimal using scale 18 -- wrong value
    val smaller = BigDecimal(new java.math.BigInteger(bytes), 18)
    smaller: scala.math.BigDecimal = 1.23456E-13


    如果我只选择 bd_value 列的最大值,Spark 似乎不会通过 setDecimal 。我还没有验证为什么,或者它去了哪里。

    但是,这可以解释问题中观察到的值。使用相同的案例类 Foo :
    // This BigDecimal has scale 0
    val rdd = spark.sparkContext.parallelize(Seq(Foo("C", BigDecimal(123456), 123456.0)))

    // And shows with scale 0 in the DF
    rdd.toDF.show
    +-----+--------+--------+
    |group|bd_value| d_value|
    +-----+--------+--------+
    | C| 123456|123456.0|
    +-----+--------+--------+

    // But the schema has scale 18
    rdd.toDF.printSchema
    root
    |-- group: string (nullable = true)
    |-- bd_value: decimal(38,18) (nullable = true)
    |-- d_value: double (nullable = false)


    // groupBy + max corrupts in the same way as converting to bytes via unscaled, then to BigDecimal with scale 18
    rdd.groupBy("group").max("bd_value").show
    +-----+-------------+
    |group|max(bd_value)|
    +-----+-------------+
    | C| 1.23456E-13|
    +-----+-------------+

    // This BigDecimal is forced to have the same scale as the inferred schema
    val rdd = spark.sparkContext.parallelize(Seq(Foo("C",BigDecimal(123456).setScale(18), 123456.0)))

    // verified the scale is 18 in the DF
    +-----+--------------------+--------+
    |group| bd_value| d_value|
    +-----+--------------------+--------+
    | C|123456.0000000000...|123456.0|
    +-----+--------------------+--------+


    // And it works as expected
    rdd1.groupBy("group").max("bd_value").show

    +-----+--------------------+
    |group| max(bd_value)|
    +-----+--------------------+
    | C|123456.0000000000...|
    +-----+--------------------+


    这也可以解释为什么,正如在评论中所观察到的那样,当从具有显式模式的 RDD[Row] 转换时它可以正常工作。
    val rdd2 = spark.sparkContext.parallelize(Seq(Row("C", BigDecimal(123456), 123456.0)))

    // schema has BigDecimal scale 18
    val schema = StructType(Seq(StructField("group", StringType, true), StructField("bd_value", DecimalType(38,18), true), StructField("d_value",DoubleType,false)))

    // createDataFrame interprets the value into the schema's scale
    val df = spark.createDataFrame(rdd2, schema)

    df.show

    +-----+--------------------+--------+
    |group| bd_value| d_value|
    +-----+--------------------+--------+
    | C|123456.0000000000...|123456.0|
    +-----+--------------------+--------+

    关于apache-spark - 为什么 BigDecimal 的 Spark groupBy.agg(min/max) 总是返回 0?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54640777/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com