gpt4 book ai didi

scala - Spark(Scala)过滤结构数组而不爆炸

转载 作者:行者123 更新时间:2023-12-03 06:44:08 25 4
gpt4 key购买 nike

我有一个带有键的数据框和一个列,数据框列中有一个结构数组。每行包含一列,看起来像这样:

[
{"id" : 1, "someProperty" : "xxx", "someOtherProperty" : "1", "propertyToFilterOn" : 1},
{"id" : 2, "someProperty" : "yyy", "someOtherProperty" : "223", "propertyToFilterOn" : 0},
{"id" : 3, "someProperty" : "zzz", "someOtherProperty" : "345", "propertyToFilterOn" : 1}
]

现在我想做两件事:

  1. 过滤“propertyToFilterOn”= 1
  2. 将一些逻辑应用到其他逻辑上属性 - 例如连接

所以结果是:

[
{"id" : 1, "newProperty" : "xxx_1"},
{"id" : 3, "newProperty" : "zzz_345"}
]

我知道如何使用爆炸来做到这一点,但是在将其放回一起时,爆炸还需要在键上使用 groupBy 。但由于这是一个流数据帧,我还必须在其上添加水印,这是我试图避免的。

有没有其他方法可以在不使用爆炸的情况下实现这一目标?我确信有一些 Scala 魔法可以实现这一点!

谢谢!

最佳答案

spark 2.4+ 带来了许多用于数组的高阶函数。 (参见https://docs.databricks.com/spark/2.x/spark-sql/language-manual/functions.html)

val dataframe = Seq(
("a", 1, "xxx", "1", 1),
("a", 2, "yyy", "223", 0),
("a", 3, "zzz", "345", 1)
).toDF( "grouping_key", "id" , "someProperty" , "someOtherProperty", "propertyToFilterOn" )
.groupBy("grouping_key")
.agg(collect_list(struct("id" , "someProperty" , "someOtherProperty", "propertyToFilterOn")).as("your_array"))

dataframe.select("your_array").show(false)

+----------------------------------------------------+
|your_array |
+----------------------------------------------------+
|[[1, xxx, 1, 1], [2, yyy, 223, 0], [3, zzz, 345, 1]]|
+----------------------------------------------------+

您可以使用数组过滤器高阶函数过滤数组中的元素,如下所示:

val filteredDataframe = dataframe.select(expr("filter(your_array, your_struct -> your_struct.propertyToFilterOn == 1)").as("filtered_arrays"))

filteredDataframe.show(false)

+----------------------------------+
|filtered_arrays |
+----------------------------------+
|[[1, xxx, 1, 1], [3, zzz, 345, 1]]|
+----------------------------------+

对于您谈论的“其他逻辑”,您应该能够使用转换高阶数组函数,如下所示:

val tranformedDataframe = filteredDataframe
.select(expr("transform(filtered_arrays, your_struct -> struct(concat(your_struct.someProperty, '_', your_struct.someOtherProperty))"))

但是从转换函数返回结构存在问题,如本文所述:

http://mail-archives.apache.org/mod_mbox/spark-user/201811.mbox/%3CCALZs8eBgWqntiPGU8N=ENW2Qvu8XJMhnViKy-225ktW+_c0czA@mail.gmail.com%3E

所以你最好使用数据集 api 进行转换,如下所示:

case class YourStruct(id:String, someProperty: String, someOtherProperty: String)
case class YourArray(filtered_arrays: Seq[YourStruct])

case class YourNewStruct(id:String, newProperty: String)

val transformedDataset = filteredDataframe.as[YourArray].map(_.filtered_arrays.map(ys => YourNewStruct(ys.id, ys.someProperty + "_" + ys.someOtherProperty)))

val transformedDataset.show(false)

+--------------------------+
|value |
+--------------------------+
|[[1, xxx_1], [3, zzz_345]]|
+--------------------------+

关于scala - Spark(Scala)过滤结构数组而不爆炸,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54954732/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com