gpt4 book ai didi

java - Scala 中的 Spark UDF 用于提取相关数据

转载 作者:行者123 更新时间:2023-11-30 02:03:05 24 4
gpt4 key购买 nike

我有一个数据框,其中有一列需要一些清理。我期待着一种可以应用于 Java/Scala 中的 Spark UDF 的正则表达式模式,该模式将从字符串中提取有效内容。

userId 列的示例输入行,如下面的 DataFrame 所示:

[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]

名为“userId”的列的预期转换:

一个字符串,看起来像:

105286112|115090439|29818926

我需要逻辑/方法来修改userId列,以便制作相同的UDF。正则表达式或其他方法可以发生这种情况吗?

输入 DataFrame 如下所示:

+--------------------+--------------------+
| dt_geo_cat_brand| userId |
+--------------------+--------------------+
|2017-10-30_17-18 ...|[[133207500,2017-...|
|2017-10-19_21-22 ...|[[194112773,2017-...|
|2017-10-29_17-18 ...|[[274188233,2017-...|
|2017-10-29_14-16 ...|[[86281353,2017-1...|
|2017-10-01_09-10 ...|[[92478766,2017-1...|
|2017-10-09_17-18 ...|[[156663365,2017-...|
|2017-10-06_17-18 ...|[[111869972,2017-...|
|2017-10-13_09-10 ...|[[64404465,2017-1...|
|2017-10-13_07-08 ...|[[146355663,2017-...|
|2017-10-22_21-22 ...|[[54096488,2017-1...|
+--------------------+--------------------+

架构:

root
|-- dt_geo_cat_brand: string (nullable = true)
|-- userId: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _1: string (nullable = true)
| | |-- _2: string (nullable = true)

所需输出:

+--------------------+--------------------+
| dt_geo_cat_brand| userId |
+--------------------+--------------------+
|2017-10-30_17-18 ...|133207500,1993333444|
|2017-10-19_21-22 ...|122122212,3432323333|
|2017-10-29_17-18 ...|274188233,8869696966|
|2017-10-29_14-16 ...|862813534,444344444,43444343434|
|2017-10-01_09-10 ...|92478766,880342342,4243244432,5554335535|
+--------------------+--------------------+

等等...

最佳答案

您不需要正则表达式来解决这个问题。数据被格式化为结构数组,查看架构,您想要的是每个结构的 _1 字符串。这可以通过 UDF 来解决,该 UDF 提取值,然后使用 mkString("|") 将所有内容转换为字符串以获得预期输出:

val extract_id = udf((arr: Seq[Row]) => { 
arr.map(_.getAs[String](0)).mkString("|")
})

df.withColumn("userId", extract_id($"userId"))
<小时/>

根据评论 #1 添加:

如果您想将按 dt_geo_cat_brand 分区的结果保存在 csv 文件中(所有值都在其自己的行上),您可以按如下方式操作。首先,从 udf 返回一个列表而不是字符串并使用 explode:

val extract_id = udf((arr: Seq[Row]) => { 
arr.map(_.getAs[String](0))
})

val df2 = df.withColumn("userId", explode(extract_id($"userId")))

然后在保存时使用partitionBy(dt_geo_cat_brand)。这将根据 dt_geo_cat_brand 列中的值创建文件夹结构。根据分区的不同,每个文件夹中的 csv 文件数量可能有所不同,但它们的值都来自 dt_geo_cat_brand 中的单个值(在保存之前使用 repartition(1),如果您需要一个文件并且有足够的内存)。

df2.write.partitionBy("dt_geo_cat_brand").csv(baseOutputBucketPath)
<小时/>

根据评论 #2 进行补充:

要在另存为单独文件时不使用 partitionBy,您可以执行以下操作(建议使用 partitioBy 方法)。首先,在 dt_geo_cat_brand 中查找所有不同的值:

val vals = df.select("dt_geo_cat_brand").distinct().as[String].collect()

对于每个值,过滤数据帧并保存它(此处使用分解的 df2 数据帧作为附加#1):

vals.foreach { v =>
df2.filter($"dt_geo_cat_brand" === v)
.select("userId")
.write
.csv(s"$baseOutputBucketPath=$v/")})
}

或者,如果使用了 udf,则不要使用分解的数据帧,而是在 "|" 上分割:

vals.foreach { v =>
df.filter($"dt_geo_cat_brand" === v)
.select(split($"userId", "\\|").as("userId"))
.write
.csv(s"$baseOutputBucketPath=$v/")})
}

关于java - Scala 中的 Spark UDF 用于提取相关数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52175802/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com