gpt4 book ai didi

postgresql - 将 PostgreSQL 数据库加载到 SchemaRDD

转载 作者:行者123 更新时间:2023-11-29 13:01:49 25 4
gpt4 key购买 nike

我在 PostgreSQL 中有一个包含 100 万行和 100 多列的数据源,我想使用 Spark SQL,所以我想转换这个数据源以获得 SchemaRDD .

Spark SQL Programming Guide 中介绍了两种方法, 一种是通过反射,这意味着我需要定义:

case class Row(Var1: Int, Var2: String, ...)

这很乏味,因为我有 100 多个列。

另一种方法是“以编程方式指定架构”,这意味着我需要定义:

val schema =
StructType(
Seq(StructField("Var1", IntegerType), StructField("Var2", StringType), ...))

这对我来说也很乏味。

实际上,还有另一个问题,因为我加载了我的 PostgreSQL数据库使用JdbcRDD类,但我发现我还需要在 mapRow 中定义模式JdbcRDD 的参数构造函数,看起来像:

def extractValues(r: ResultSet) = {
(r.getInt("Var1"), r.getString("Var2"), ...)
}
val dbRDD = new JdbcRDD(sc, createConnection,
"SELECT * FROM PostgreSQL OFFSET ? LIMIT ?",
0, 1000000, 1, extractValues)

这个 API 仍然要求我自己创建模式,更糟糕的是我需要重做类似的事情来转换这个 JdbcRDDSchemaRDD ,那将是非常笨拙的代码。

所以我想知道完成这项任务的最佳方法是什么?

最佳答案

您只需要支持有限数量的数据类型。为什么不使用

java.sql.ResultSetMetaData

例如

val rs = jdbcStatement.executeQuery("select * from myTable limit 1")
val rmeta = rs.getMetaData

读取一行,然后为每一列动态生成所需的 StructField。

你需要一个 case 语句来处理

val myStructFields = for (cx <- 0 until rmeta.getColumnCount) {
val jdbcType = rmeta.getColumnType(cx)
} yield StructField(rmeta.getColumnName(cx),jdbcToSparkType(jdbcType))

val mySchema = StructType(myStructFields.toSeq)

其中 jdbcToSparkType 沿以下几行:

  def jdbcToSparkType(jdbcType: Int) = {
jdbcType match {
case 4 => InteegerType
case 6 => FloatType
..
}

UPDATE 要生成 RDD[Row] :您将遵循类似的模式。在这种情况下,你会

val rows = for (rs.next) {
row = jdbcToSpark(rs)
} yield row

val rowRDD = sc.parallelize(rows)

在哪里

def jdbcToSpark(rs: ResultSet) = {
var rowSeq = Seq[Any]()
for (cx <- 0 to rs.getMetaData.getColumnCount) {
rs.getColumnType(cx) match {
case 4 => rowSeq :+ rs.getInt(cx)
..
}
}
Row.fromSeq(rowSeq)
}

然后 有效行

关于postgresql - 将 PostgreSQL 数据库加载到 SchemaRDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27674055/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com