gpt4 book ai didi

scala - 如何在groupBy之后将值聚合到集合中?

转载 作者:行者123 更新时间:2023-12-03 06:10:44 26 4
gpt4 key购买 nike

我有一个具有如下架构的数据框:

[visitorId: string, trackingIds: array<string>, emailIds: array<string>]

正在寻找一种方法按 guestid 对此数据框进行分组(或者可能汇总?),其中 TrackingIds 和 emailIds 列将附加在一起。例如,如果我的初始 df 看起来像:

visitorId   |trackingIds|emailIds
+-----------+------------+--------
|a158| [666b] | [12]
|7g21| [c0b5] | [45]
|7g21| [c0b4] | [87]
|a158| [666b, 777c]| []

我希望我的输出 df 看起来像这样

visitorId   |trackingIds|emailIds
+-----------+------------+--------
|a158| [666b,666b,777c]| [12,'']
|7g21| [c0b5,c0b4] | [45, 87]

尝试使用groupByagg运算符,但运气不佳。

最佳答案

Spark >= 2.4

您可以用内置 flatten function 替换 flatten udf

import org.apache.spark.sql.functions.flatten

其余部分保持原样。

Spark >= 2.0,< 2.4

这是可能的,但相当昂贵。使用您提供的数据:

case class Record(
visitorId: String, trackingIds: Array[String], emailIds: Array[String])

val df = Seq(
Record("a158", Array("666b"), Array("12")),
Record("7g21", Array("c0b5"), Array("45")),
Record("7g21", Array("c0b4"), Array("87")),
Record("a158", Array("666b", "777c"), Array.empty[String])).toDF

和一个辅助函数:

import org.apache.spark.sql.functions.udf

val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten)

我们可以用占位符填充空白:

import org.apache.spark.sql.functions.{array, lit, when}

val dfWithPlaceholders = df.withColumn(
"emailIds",
when(size($"emailIds") === 0, array(lit(""))).otherwise($"emailIds"))

collect_lists展平:

import org.apache.spark.sql.functions.{array, collect_list}

val emailIds = flatten(collect_list($"emailIds")).alias("emailIds")
val trackingIds = flatten(collect_list($"trackingIds")).alias("trackingIds")

df
.groupBy($"visitorId")
.agg(trackingIds, emailIds)

// +---------+------------------+--------+
// |visitorId| trackingIds|emailIds|
// +---------+------------------+--------+
// | a158|[666b, 666b, 777c]| [12, ]|
// | 7g21| [c0b5, c0b4]|[45, 87]|
// +---------+------------------+--------+

使用静态类型的数据集:

df.as[Record]
.groupByKey(_.visitorId)
.mapGroups { case (key, vs) =>
vs.map(v => (v.trackingIds, v.emailIds)).toArray.unzip match {
case (trackingIds, emailIds) =>
Record(key, trackingIds.flatten, emailIds.flatten)
}}

// +---------+------------------+--------+
// |visitorId| trackingIds|emailIds|
// +---------+------------------+--------+
// | a158|[666b, 666b, 777c]| [12, ]|
// | 7g21| [c0b5, c0b4]|[45, 87]|
// +---------+------------------+--------+

Spark 1.x

可以转换为RDD和分组

import org.apache.spark.sql.Row

dfWithPlaceholders.rdd
.map {
case Row(id: String,
trcks: Seq[String @ unchecked],
emails: Seq[String @ unchecked]) => (id, (trcks, emails))
}
.groupByKey
.map {case (key, vs) => vs.toArray.unzip match {
case (trackingIds, emailIds) =>
Record(key, trackingIds.flatten, emailIds.flatten)
}}
.toDF

// +---------+------------------+--------+
// |visitorId| trackingIds|emailIds|
// +---------+------------------+--------+
// | 7g21| [c0b5, c0b4]|[45, 87]|
// | a158|[666b, 666b, 777c]| [12, ]|
// +---------+------------------+--------+

关于scala - 如何在groupBy之后将值聚合到集合中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34202997/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com