gpt4 book ai didi

scala - Spark 2.0数据集groupByKey并对操作和类型安全进行划分

转载 作者:行者123 更新时间:2023-12-04 13:37:15 25 4
gpt4 key购买 nike

我对Spark 2.0 DataSet感到非常满意,因为它具有编译时类型安全性。但是这里有几个我无法解决的问题,我也没有为此找到好的文档。

问题#1-对汇总列进行除法运算-
考虑下面的代码-
我有一个DataSet [MyCaseClass],我想在c1,c2,c3和sum(c4)/8上使用groupByKey。下面的代码可以很好地工作,如果我只是计算总和,但它会给出除法(8)的编译时错误。我想知道如何实现关注。

final case class MyClass (c1: String,
c2: String,
c3: String,
c4: Double)

val myCaseClass: DataSet[MyCaseClass] = ??? // assume it's being loaded

import sparkSession.implicits._
import org.apache.spark.sql.expressions.scalalang.typed.{sum => typedSum}

myCaseClass.
groupByKey(myCaseClass =>
(myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
agg(typedSum[MyCaseClass](_.c4).name("sum(c4)").
divide(8)). //this is breaking with exception
show()

如果我删除.divide(8)操作并在命令上方运行,它将给我以下输出。

+-----------+-------------+
| key|sum(c4) |
+-----------+-------------+
| [A1,F2,S1]| 80.0|
| [A1,F1,S1]| 40.0|
+-----------+-------------+

问题#2-将groupedByKey结果转换为另一个Typed DataFrame-
现在,我的问题的第二部分是我要再次输出类型化的DataSet。为此,我有另一个案例类(不确定是否需要),但不确定如何映射分组结果-
final case class AnotherClass(c1: String,
c2: String,
c3: String,
average: Double)

myCaseClass.
groupByKey(myCaseClass =>
(myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
agg(typedSum[MyCaseClass](_.c4).name("sum(c4)")).
as[AnotherClass] //this is breaking with exception

但这又失败了,但有一个异常(exception),因为按关键结果分组的结果没有直接与AnotherClass映射。

PS:实现上述目标的任何其他解决方案都值得欢迎。

最佳答案

第一个问题可以通过完全使用类型化的列来解决(KeyValueGroupedDataset.agg期望TypedColumn(-s))
您可以将聚合结果定义为:

val eight = lit(8.0)
.as[Double] // Not necessary

val sumByEight = typedSum[MyClass](_.c4)
.divide(eight)
.as[Double] // Required
.name("div(sum(c4), 8)")

并将其插入以下代码:

val myCaseClass = Seq(
MyClass("a", "b", "c", 2.0),
MyClass("a", "b", "c", 3.0)
).toDS

myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.agg(sumByEight)

要得到

+-------+---------------+
| key|div(sum(c4), 8)|
+-------+---------------+
|[a,b,c]| 0.625|
+-------+---------------+

第二个问题是使用不符合数据形状的类的结果。正确的表示形式可能是:

case class AnotherClass(key: (String, String, String), sum: Double)

与上面定义的数据一起使用:

 myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.agg(typedSum[MyClass](_.c4).name("sum"))
.as[AnotherClass]

会给:
+-------+---+
| key|sum|
+-------+---+
|[a,b,c]|5.0|
+-------+---+

但是如果 .as[AnotherClass]可以接受,则在这里不需要 Dataset[((String, String, String), Double)]

您当然可以跳过所有操作,而仅跳过 mapGroups(尽管并非不影响性能):

import shapeless.syntax.std.tuple._   // A little bit of shapeless

val tuples = myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.mapGroups((group, iter) => group :+ iter.map(_.c4).sum)

结果
+---+---+---+---+   
| _1| _2| _3| _4|
+---+---+---+---+
| a| b| c|5.0|
+---+---+---+---+
reduceGroups可能是一个更好的选择:
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.reduceGroups((x, y) => x.copy(c4=x.c4 + y.c4))

与产生的 Dataset:
+-------+-----------+    
| _1| _2|
+-------+-----------+
|[a,b,c]|[a,b,c,5.0]|
+-------+-----------+

关于scala - Spark 2.0数据集groupByKey并对操作和类型安全进行划分,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39946210/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com