gpt4 book ai didi

scala - UDF 在 Scala 中按键过滤 map

转载 作者:行者123 更新时间:2023-12-05 06:36:10 27 4
gpt4 key购买 nike

我有一个具有以下架构的 Spark DataFrame:

root
|-- mapkey: map (nullable = true)
| |-- key: string
| |-- value: array (valueContainsNull = true)
| | |-- element: struct (containsNull = true)
| | | |-- id: string (nullable = true)
| | | |-- bt: string (nullable = true)
| | | |-- bp: double (nullable = true)
| | | |-- z: struct (nullable = true)
| | | | |-- w: integer (nullable = true)
| | | | |-- h: integer (nullable = true)
|-- uid: string (nullable = true)

我想写一个 UDF 来过滤 mapkey,使 key 等于 uid,并且只返回通过过滤器的值。我正在尝试以下操作:

val filterMap = udf((m: Map[String, Seq[Row]], uid: String) => {
val s = Set(uid)
m.filterKeys { s.contains(_) == true }
})

但我收到以下错误:

java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Row is not supported at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:762) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:704) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56) at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:809) at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:703) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:722) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:704) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56) at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:809) at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:703) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:726) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:704) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56) at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:809) at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:703) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:700) at org.apache.spark.sql.functions$.udf(functions.scala:3200)

有人能指出 UDF 有什么问题吗?

最佳答案

看起来您唯一的选择是使用与此 Row 的内部结构相匹配的案例类:

case class MyStruct(w: Int, h: Int)
case class Element(id: String, bt: String, bp: Double, z: MyStruct)

然后您可以在您的 UDF 中使用它(足够令人惊讶):

// sample data:
val df = Seq(
(Map(
"key1" -> Array(Element("1", "bt1", 0.1, MyStruct(1, 2)), Element("11", "bt11", 0.2, MyStruct(1, 3))),
"key2" -> Array(Element("2", "bt2", 0.2, MyStruct(12, 22)))
), "key2")
).toDF("mapkey", "uid")

df.printSchema() // prints the right schema, as expected in post

// define UDF:
val filterMap = udf((m: Map[String, Seq[Element]], uid: String) => {
m.filterKeys(_ == uid)
})

// use UDF:
df.withColumn("result", filterMap($"mapkey", $"uid")).show(false)

// prints:
// +-----------------------------------------------------------------+
// |result |
// +-----------------------------------------------------------------+
// |Map(key1 -> WrappedArray([1,bt1,0.1,[1,2]], [11,bt11,0.2,[1,3]]))|
// +-----------------------------------------------------------------+

关于scala - UDF 在 Scala 中按键过滤 map ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49245061/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com