gpt4 book ai didi

scala - 在 Spark 数据集中滚动你自己的 reduceByKey

转载 作者:行者123 更新时间:2023-12-03 11:37:14 26 4
gpt4 key购买 nike

除了 RDDs 之外,我正在尝试学习更多地使用 DataFrames 和 DataSets。对于 RDD,我知道我可以做 someRDD.reduceByKey((x,y) => x + y) ,但我没有看到 Dataset 的那个函数。所以我决定写一篇。

someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => {
val result = mutable.HashMap.empty[(Long,Long),Int]
val keys = mutable.HashSet.empty[(Long,Long)]
y.keys.foreach(z => keys += z)
x.keys.foreach(z => keys += z)
for (elem <- keys) {
val s1 = if(x.contains(elem)) x(elem) else 0
val s2 = if(y.contains(elem)) y(elem) else 0
result(elem) = s1 + s2
}
result
})

但是,这会将所有内容返回给驱动程序。你会怎么写这个来返回 Dataset ?也许 mapPartition 并在那里做?

请注意,这会编译但无法运行,因为它没有 Map 的编码器然而

最佳答案

我假设您的目标是将此习语转换为数据集:

rdd.map(x => (x.someKey, x.someField))
.reduceByKey(_ + _)

// => returning an RDD of (KeyType, FieldType)

目前,我使用 Dataset API 找到的最接近的解决方案如下所示:
ds.map(x => (x.someKey, x.someField))          // [1]
.groupByKey(_._1)
.reduceGroups((a, b) => (a._1, a._2 + b._2))
.map(_._2) // [2]

// => returning a Dataset of (KeyType, FieldType)

// Comments:
// [1] As far as I can see, having a map before groupByKey is required
// to end up with the proper type in reduceGroups. After all, we do
// not want to reduce over the original type, but the FieldType.
// [2] required since reduceGroups converts back to Dataset[(K, V)]
// not knowing that our V's are already key-value pairs.

看起来不是很优雅,根据快速基准测试,它的性能也低得多,所以也许我们在这里遗漏了一些东西......

注意:另一种可能是使用 groupByKey(_.someKey)作为第一步。问题是使用 groupByKey将类型从常规更改 Dataset KeyValueGroupedDataset .后者没有正规的 map功能。相反,它提供了 mapGroups ,这看起来不太方便,因为它将值包装成 Iterator并根据文档字符串执行 shuffle。

关于scala - 在 Spark 数据集中滚动你自己的 reduceByKey,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38383207/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com