gpt4 book ai didi

java - Spark SQL : using collect_set over array values?

转载 作者:行者123 更新时间:2023-11-29 04:11:56 26 4
gpt4 key购买 nike

我有一个聚合 DataFrame,其中有一列是使用 collect_set 创建的。我现在需要再次聚合此 DataFrame,并再次将 collect_set 应用于该列的值。问题是我需要应用 collect_Set ver 集合的值 - 到目前为止,我看到的唯一方法是分解聚合的 DataFrame。有没有更好的办法?

示例:

初始数据帧:

country   | continent   | attributes
-------------------------------------
Canada | America | A
Belgium | Europe | Z
USA | America | A
Canada | America | B
France | Europe | Y
France | Europe | X

聚合 DataFrame(我收到的输入数据)- country 聚合:

country   | continent   | attributes
-------------------------------------
Canada | America | A, B
Belgium | Europe | Z
USA | America | A
France | Europe | Y, X

我想要的输出 - 在 continent 上的聚合:

continent   | attributes
-------------------------------------
America | A, B
Europe | X, Y, Z

最佳答案

由于此时您只能拥有少量行,因此您只需按原样收集属性并将结果展平 (Spark >= 2.4)

import org.apache.spark.sql.functions.{collect_set, flatten, array_distinct}

val byState = Seq(
("Canada", "America", Seq("A", "B")),
("Belgium", "Europe", Seq("Z")),
("USA", "America", Seq("A")),
("France", "Europe", Seq("Y", "X"))
).toDF("country", "continent", "attributes")

byState
.groupBy("continent")
.agg(array_distinct(flatten(collect_set($"attributes"))) as "attributes")
.show
+---------+----------+
|continent|attributes|
+---------+----------+
| Europe| [Y, X, Z]|
| America| [A, B]|
+---------+----------+

在一般情况下,事情要难处理得多,在许多情况下,如果您期望大型列表,每组有很多重复项和许多值,最佳解决方案*是从头开始重新计算结果,即

input.groupBy($"continent").agg(collect_set($"attributes") as "attributes")

一种可能的替代方法是使用 Aggregator

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders}
import scala.collection.mutable.{Set => MSet}


class MergeSets[T, U](f: T => Seq[U])(implicit enc: Encoder[Seq[U]]) extends
Aggregator[T, MSet[U], Seq[U]] with Serializable {

def zero = MSet.empty[U]

def reduce(acc: MSet[U], x: T) = {
for { v <- f(x) } acc.add(v)
acc
}

def merge(acc1: MSet[U], acc2: MSet[U]) = {
acc1 ++= acc2
}

def finish(acc: MSet[U]) = acc.toSeq
def bufferEncoder: Encoder[MSet[U]] = Encoders.kryo[MSet[U]]
def outputEncoder: Encoder[Seq[U]] = enc

}

并按如下方式应用

case class CountryAggregate(
country: String, continent: String, attributes: Seq[String])

byState
.as[CountryAggregate]
.groupByKey(_.continent)
.agg(new MergeSets[CountryAggregate, String](_.attributes).toColumn)
.toDF("continent", "attributes")
.show
+---------+----------+
|continent|attributes|
+---------+----------+
| Europe| [X, Y, Z]|
| America| [B, A]|
+---------+----------+

但这显然不是 Java 友好的选项。

另见 How to aggregate values into collection after groupBy? (类似,但没有唯一性约束)。


* 这是因为 explode 可能非常昂贵,尤其是在较旧的 Spark 版本中,与访问 SQL 集合的外部表示相同。

关于java - Spark SQL : using collect_set over array values?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54616498/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com