gpt4 book ai didi

scala - 如何使用 spark 将 Parquet 数据转换为案例类?

转载 作者:行者123 更新时间:2023-12-02 08:23:59 25 4
gpt4 key购买 nike

我有很多案例类,我在 spark 中使用它们将数据保存为 Parquet ,例如:

case class Person(userId: String,
technographic: Option[Technographic] = None,
geographic: Option[Geographic] = None)

case class Technographic(browsers: Seq[Browser],
devices: Seq[Device],
oss: Seq[Os])

case class Browser(family: String,
major: Option[String] = None,
language: String

...

如何将磁盘上的数据转换回这些案例类?

我需要能够选择多个列并将它们展开,以便每个列表(例如 浏览器)的所有子列表都具有相同的长度。

例如鉴于此原始数据:

Person(userId="1234",
technographic=Some(Technographic(browsers=Seq(
Browser(family=Some("IE"), major=Some(7), language=Some("en")),
Browser(family=None, major=None, language=Some("en-us")),
Browser(family=Some("Firefox), major=None, language=None)
)),
geographic=Some(Geographic(...))
)

我需要,例如浏览器数据如下(以及能够选择所有列):

family=IE, major=7, language=en
family=None, major=None, language=en-us
family=Firefox, major=None, language=None

如果 spark 可以 explode 每个列表项,我可以得到它。目前它只会做类似的事情(无论如何 explode 不会处理多列):

browsers.family = ["IE", "Firefox"]
browsers.major = [7]
browsers.language = ["en", "en-us"]

那么如何使用 spark 1.5.2 从所有这些嵌套的可选数据中重建用户记录(生成一行数据的整个案例类集)?

一种可能的方法是:

val df = sqlContext.read.parquet(inputPath)
df.registerTempTable("person")
val fields = df.select("desc person")
df.select("select * from person").map { x =>
... // somehow zip `fields` with the values so that I can
// access values by column name instead of index
// (which is brittle), but how?
}

最佳答案

给定

case class Browser(family: String,
major: Option[Int] = None,
language: String)

case class Tech(browsers: Seq[Browser],
devices: Seq[String],
oss: Seq[String])


case class Person(userId: String,
tech: Option[Tech] = None,
geographic: Option[String] = None)

以及 org.apache.spark.sql.Row 的一些便利类型/函数

type A[E] = collection.mutable.WrappedArray[E]

implicit class RichRow(val r: Row) {
def getOpt[T](n: String): Option[T] = {
if (isNullAt(n)) {
None
} else {
Some(r.getAs[T](n))
}
}

def getStringOpt(n: String) = getOpt[String](n)
def getString(n: String) = getStringOpt(n).get

def getIntOpt(n: String) = getOpt[Int](n)
def getInt(n: String) = r.getIntOpt(n).get

def getArray[T](n: String) = r.getAs[A[T]](n)

def getRow(n: String) = r.getAs[Row](n)
def getRows(n: String) = r.getAs[A[Row]](n)

def isNullAt(n: String) = r.isNullAt(r.fieldIndex(n))
}

然后解析可以组织在一些函数中:

def toBrowser(r: Row): Browser = {
Browser(
r.getString("family"),
r.getIntOpt("major"),
r.getString("language"))
}

def toBrowsers(rows: A[Row]): Seq[Browser] = {
rows.map(toBrowser)
}

def toTech(r: Row): Tech = {
Tech(
toBrowsers(r.getRows("browsers")),
r.getArray[String]("devices"),
r.getArray[String]("oss"))
}

def toTechOpt(r: Row): Option[Tech] = {
Option(r).map(toTech)
}

def toPerson(r: Row): Person = {
Person(
r.getString("userId"),
toTechOpt(r.getRow("tech")),
r.getStringOpt("geographic"))
}

这样你就可以写

df.map(toPerson).collect().foreach(println)

  • 我已将解析函数组织成“独立”方法。我通常会将它们作为 apply 放入案例类的伴随对象中,或者也作为 Row 的隐式值类。函数的原因是这更容易粘贴到 spark-shell

  • 每个解析函数直接处理普通列和数组,但在遇到集合时委托(delegate)给另一个函数(SeqOption - 这些代表下一个嵌套水平)

  • 隐式类 应该extend AnyVal,但是同样不能将其粘贴到spark-shell

    /li>

关于scala - 如何使用 spark 将 Parquet 数据转换为案例类?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34071174/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com