gpt4 book ai didi

apache-spark - 编码后无法对自定义类型进行操作? Spark 数据集

转载 作者:行者123 更新时间:2023-12-04 15:16:45 26 4
gpt4 key购买 nike

假设你有这个(编码自定义类型的解决方案来自 this thread ):

// assume we handle custom type
class MyObj(val i: Int, val j: String)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
val ds = spark.createDataset(Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c")))

当执行 ds.show 时,我得到:

+--------------------+
| value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

我理解这是因为内容被编码为内部 Spark SQL 二进制表示。但是我怎样才能像这样显示解码后的内容呢?

+---+---+
| _1| _2|
+---+---+
| 1| a|
| 2| b|
| 3| c|
+---+---+

更新1

显示内容不是最大的问题,更重要的是它可能导致处理数据集时出现问题,考虑这个例子:

// continue with the above code
val ds2 = spark.createDataset(Seq(new MyObj(2, "a"),new MyObj(6, "b"),new MyObj(5, "c")))

ds.joinWith(ds2, ds("i") === ds2("i"), "inner")
// this gives a Runtime error: org.apache.spark.sql.AnalysisException: Cannot resolve column name "i" among (value);

这是否意味着,kryo-encoded 类型不能方便地进行类似joinWith 的操作?

How do we process custom type on Dataset then?
If we are not able to process it after it's encoded, what's the point of this kryo encoding solution on custom type?!

(下面@jacek 提供的解决方案对于case class 类型很好理解,但它仍然无法解码自定义类型)

最佳答案

以下对我有用,但似乎使用高级 API 来执行低级(反序列化)工作。

这并不是说应该这样做,而是表明这是可能的。

我不知道为什么 KryoDeserializer 不将字节反序列化为字节来自的对象。就是这样。

你的类定义和我的一个主要区别是这个案例让我可以使用以下技巧。同样,不知道为什么它使它成为可能。

scala> println(spark.version)
3.0.1

// Note that case keyword
case class MyObj(val i: Int, val j: String)
import org.apache.spark.sql.Encoders
implicit val myObjEncoder = Encoders.kryo[MyObj]
// myObjEncoder: org.apache.spark.sql.Encoder[MyObj] = class[value[0]: binary]

val ds = (Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c"))).toDS
// the Kryo deserializer gives bytes
scala> ds.printSchema
root
|-- value: binary (nullable = true)

scala> :type sc
org.apache.spark.SparkContext

// Let's deserialize the bytes into an object
import org.apache.spark.serializer.KryoSerializer
val ks = new KryoSerializer(sc.getConf)
// that begs for a generic UDF
val deserMyObj = udf { value: Array[Byte] =>
import java.nio.ByteBuffer
ks.newInstance.deserialize(ByteBuffer.wrap(value)).asInstanceOf[MyObj] }

val solution = ds.select(deserMyObj('value) as "result").select($"result.*")
scala> solution.show
+---+---+
| i| j|
+---+---+
| 1| a|
| 2| b|
| 3| c|
+---+---+

关于apache-spark - 编码后无法对自定义类型进行操作? Spark 数据集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64184387/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com