gpt4 book ai didi

scala - 如何在 Spark SQL 中为自定义类型定义模式?

转载 作者:行者123 更新时间:2023-12-03 10:28:51 26 4
gpt4 key购买 nike

以下示例代码尝试将一些案例对象放入数据框中。该代码包括使用此特征的案例对象层次结构和案例类的定义:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

sealed trait Some
case object AType extends Some
case object BType extends Some

case class Data( name : String, t: Some)

object Example {
def main(args: Array[String]) : Unit = {
val conf = new SparkConf()
.setAppName( "Example" )
.setMaster( "local[*]")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
df.show()
}
}

执行代码时,不幸遇到如下异常:
java.lang.UnsupportedOperationException: Schema for type Some is not supported

问题
  • 是否有可能为某些类型添加或定义模式(此处键入 Some )?
  • 是否存在另一种方法来表示这种枚举?
  • 我尝试使用 Enumeration直接,也没有成功。 (见下文)
  • Enumeration 的代码:
    object Some extends Enumeration {
    type Some = Value
    val AType, BType = Value
    }

    提前致谢。我希望,最好的方法是不要使用字符串。

    最佳答案

    Spark 2.0.0+ :
    UserDefinedType已在 Spark 2.0.0 中设为私有(private),目前还没有 Dataset友好更换。

    见:SPARK-14155 (Hide UserDefinedType in Spark 2.0)

    大多数时候是静态类型的Dataset可以作为替代品
    有一个待处理的 Jira SPARK-7768使用目标版本 2.4 再次公开 UDT API。

    另见 How to store custom objects in Dataset?

    Spark < 2.0.0

    Is there a possibility to add or define a schema for certain types (here type Some)?



    我想答案取决于你有多需要这个。看起来可以创建 UserDefinedType但它需要访问 DeveloperApi并且不完全简单或有据可查。

    import org.apache.spark.sql.types._

    @SQLUserDefinedType(udt = classOf[SomeUDT])
    sealed trait Some
    case object AType extends Some
    case object BType extends Some

    class SomeUDT extends UserDefinedType[Some] {
    override def sqlType: DataType = IntegerType

    override def serialize(obj: Any) = {
    obj match {
    case AType => 0
    case BType => 1
    }
    }

    override def deserialize(datum: Any): Some = {
    datum match {
    case 0 => AType
    case 1 => BType
    }
    }

    override def userClass: Class[Some] = classOf[Some]
    }

    您可能应该覆盖 hashCodeequals也是。

    它的 PySpark 对应物看起来像这样:

    from enum import Enum, unique
    from pyspark.sql.types import UserDefinedType, IntegerType

    class SomeUDT(UserDefinedType):
    @classmethod
    def sqlType(self):
    return IntegerType()

    @classmethod
    def module(cls):
    return cls.__module__

    @classmethod
    def scalaUDT(cls): # Required in Spark < 1.5
    return 'net.zero323.enum.SomeUDT'

    def serialize(self, obj):
    return obj.value

    def deserialize(self, datum):
    return {x.value: x for x in Some}[datum]

    @unique
    class Some(Enum):
    __UDT__ = SomeUDT()
    AType = 0
    BType = 1

    在 Spark < 1.5 中,Python UDT 需要成对的 Scala UDT,但在 1.5 中似乎不再是这种情况。

    对于像您这样的简单 UDT,您可以使用简单类型(例如 IntegerType 而不是整个 Struct )。

    关于scala - 如何在 Spark SQL 中为自定义类型定义模式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32440461/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com