gpt4 book ai didi

scala spark UDF 过滤器数组结构

转载 作者:行者123 更新时间:2023-12-03 23:49:17 25 4
gpt4 key购买 nike

我有一个带架构的数据框

root
|-- x: Long (nullable = false)
|-- y: Long (nullable = false)
|-- features: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- score: double (nullable = true)

例如,我有数据
+--------------------+--------------------+------------------------------------------+
| x | y | features |
+--------------------+--------------------+------------------------------------------+
|10 | 9 |[["f1", 5.9], ["ft2", 6.0], ["ft3", 10.9]]|
|11 | 0 |[["f4", 0.9], ["ft1", 4.0], ["ft2", 0.9] ]|
|20 | 9 |[["f5", 5.9], ["ft2", 6.4], ["ft3", 1.9] ]|
|18 | 8 |[["f1", 5.9], ["ft4", 8.1], ["ft2", 18.9]]|
+--------------------+--------------------+------------------------------------------+

我想用特定的前缀过滤特征,比如“ft”,所以最终我想要结果:
+--------------------+--------------------+-----------------------------+
| x | y | features |
+--------------------+--------------------+-----------------------------+
|10 | 9 |[["ft2", 6.0], ["ft3", 10.9]]|
|11 | 0 |[["ft1", 4.0], ["ft2", 0.9] ]|
|20 | 9 |[["ft2", 6.4], ["ft3", 1.9] ]|
|18 | 8 |[["ft4", 8.1], ["ft2", 18.9]]|
+--------------------+--------------------+-----------------------------+

我没有使用 Spark 2.4+,所以我不能使用这里提供的解决方案: Spark (Scala) filter array of structs without explode

我尝试使用 UDF,但仍然无效。这是我的尝试。我定义了一个 UDF:
def filterFeature: UserDefinedFunction = 
udf((features: Seq[Row]) =>
features.filter{
x.getString(0).startsWith("ft")
}
)

但是如果我应用这个 UDF
df.withColumn("filtered", filterFeature($"features"))

我收到错误 Schema for type org.apache.spark.sql.Row is not supported .发现回不去了 Row来自UDF。然后我试过了
def filterFeature: UserDefinedFunction = 
udf((features: Seq[Row]) =>
features.filter{
x.getString(0).startsWith("ft")
}, (StringType, DoubleType)
)

然后我得到一个错误:
 error: type mismatch;
found : (org.apache.spark.sql.types.StringType.type, org.apache.spark.sql.types.DoubleType.type)
required: org.apache.spark.sql.types.DataType
}, (StringType, DoubleType)
^

我还尝试了一些答案所建议的案例类:
case class FilteredFeature(featureName: String, featureScore: Double)
def filterFeature: UserDefinedFunction =
udf((features: Seq[Row]) =>
features.filter{
x.getString(0).startsWith("ft")
}, FilteredFeature
)

但我得到了:
 error: type mismatch;
found : FilteredFeature.type
required: org.apache.spark.sql.types.DataType
}, FilteredFeature
^

我试过:
case class FilteredFeature(featureName: String, featureScore: Double)
def filterFeature: UserDefinedFunction =
udf((features: Seq[Row]) =>
features.filter{
x.getString(0).startsWith("ft")
}, Seq[FilteredFeature]
)

我有:
<console>:192: error: missing argument list for method apply in class GenericCompanion
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `apply _` or `apply(_)` instead of `apply`.
}, Seq[FilteredFeature]
^

我试过:
case class FilteredFeature(featureName: String, featureScore: Double)
def filterFeature: UserDefinedFunction =
udf((features: Seq[Row]) =>
features.filter{
x.getString(0).startsWith("ft")
}, Seq[FilteredFeature](_)
)

我有:
<console>:201: error: type mismatch;
found : Seq[FilteredFeature]
required: FilteredFeature
}, Seq[FilteredFeature](_)
^

在这种情况下我该怎么办?

最佳答案

你有两个选择:

a) 为 UDF 提供模式,这让您返回 Seq[Row]
b) 转换 Seq[Row]SeqTuple2或案例类,那么您不需要提供架构(但如果使用元组,结构字段名称将丢失!)

对于您的情况,我更喜欢选项 a)(适用于具有许多字段的结构):

val schema = df.schema("features").dataType

val filterFeature = udf((features:Seq[Row]) => features.filter(_.getAs[String]("name").startsWith("ft")),schema)

关于scala spark UDF 过滤器数组结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59999974/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com