gpt4 book ai didi

scala - 如何验证 Spark Dataframe 的内容

转载 作者:行者123 更新时间:2023-12-03 01:53:15 24 4
gpt4 key购买 nike

我有以下 Scala Spark 代码库,它运行良好,但不应该。

第二列包含混合类型的数据,而在 Schema 中我将其定义为 IntegerType。我的实际程序有超过 100 列,并且在转换后不断派生多个子 DataFrame

如何验证 RDD 或 DataFrame 字段的内容是否具有正确的数据类型值,从而忽略无效行或将列内容更改为某个默认值。对于使用 DataFrameRDD 进行数据质量检查的任何更多指示,我们表示赞赏。

var theSeq = Seq(("X01", "41"),
("X01", 41),
("X01", 41),
("X02", "ab"),
("X02", "%%"))

val newRdd = sc.parallelize(theSeq)
val rowRdd = newRdd.map(r => Row(r._1, r._2))

val theSchema = StructType(Seq(StructField("ID", StringType, true),
StructField("Age", IntegerType, true)))
val theNewDF = sqc.createDataFrame(rowRdd, theSchema)
theNewDF.show()

最佳答案

首先,传递schema只是避免类型推断的一种方法。在 DataFrame 创建期间不会对其进行验证或强制执行。顺便说一句,我不会将 ClassCastException 描述为运行良好。有那么一刻我以为你真的发现了一个错误。

我认为重要的问题是如何首先获取像 theSeq/newRdd 这样的数据。是你自己解析的东西,还是从外部组件接收的?只需查看类型(分别是 Seq[(String, Any)]/RDD[(String, Any)] ),您就已经知道它不是一个有效的输入数据帧。处理这个级别的事情的方法可能是采用静态类型。 Scala 提供了很多巧妙的方法来处理意外情况(TryEitherOption),其中最后一个是最简单的方法,如下所示一个额外的好处是与 Spark SQL 配合得很好。处理事情的相当简单的方法可能看起来像这样

def validateInt(x: Any) = x match {
case x: Int => Some(x)
case _ => None
}

def validateString(x: Any) = x match {
case x: String => Some(x)
case _ => None
}

val newRddOption: RDD[(Option[String], Option[Int])] = newRdd.map{
case (id, age) => (validateString(id), validateInt(age))}

由于 Options 可以轻松组合,因此您可以添加额外的检查,如下所示:

def validateAge(age: Int) = {
if(age >= 0 && age < 150) Some(age)
else None
}

val newRddValidated: RDD[(Option[String], Option[Int])] = newRddOption.map{
case (id, age) => (id, age.flatMap(validateAge))}

Next 而不是 Row 这是一个非常粗糙的容器,我将使用案例类:

case class Record(id: Option[String], age: Option[Int])

val records: RDD[Record] = newRddValidated.map{case (id, age) => Record(id, age)}

此时您所要做的就是调用toDF:

import org.apache.spark.sql.DataFrame

val df: DataFrame = records.toDF
df.printSchema

// root
// |-- id: string (nullable = true)
// |-- age: integer (nullable = true)

这是一种困难但可以说是一种更优雅的方式。更快的方法是让 SQL 转换系统为您完成工作。首先让我们将所有内容转换为字符串:

val stringRdd: RDD[(String, String)] = sc.parallelize(theSeq).map(
p => (p._1.toString, p._2.toString))

接下来创建一个数据框:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.col

val df: DataFrame = stringRdd.toDF("id", "age")

val expectedTypes = Seq(StringType, IntegerType)
val exprs: Seq[Column] = df.columns.zip(expectedTypes).map{
case (c, t) => col(c).cast(t).alias(c)}

val dfProcessed: DataFrame = df.select(exprs: _*)

结果:

dfProcessed.printSchema

// root
// |-- id: string (nullable = true)
// |-- age: integer (nullable = true)


dfProcessed.show

// +---+----+
// | id| age|
// +---+----+
// |X01| 41|
// |X01| 41|
// |X01| 41|
// |X02|null|
// |X02|null|
// +---+----+

关于scala - 如何验证 Spark Dataframe 的内容,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33270907/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com