gpt4 book ai didi

apache-spark - spark-jdbc连接中如何操作numPartitions、lowerBound、upperBound?

转载 作者:行者123 更新时间:2023-12-05 08:18:44 33 4
gpt4 key购买 nike

我正在尝试使用 spark-jdbc 读取 postgres 数据库上的表。为此,我想出了以下代码:

object PartitionRetrieval {
var conf = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.default.parallelism", "20")
val log = LogManager.getLogger("Spark-JDBC Program")
Logger.getLogger("org").setLevel(Level.ERROR)
val conFile = "/home/myuser/ReconTest/inputdir/testconnection.properties"
val properties = new Properties()
properties.load(new FileInputStream(conFile))
val connectionUrl = properties.getProperty("gpDevUrl")
val devUserName = properties.getProperty("devUserName")
val devPassword = properties.getProperty("devPassword")
val driverClass = properties.getProperty("gpDriverClass")
val tableName = "base.ledgers"
try {
Class.forName(driverClass).newInstance()
} catch {
case cnf: ClassNotFoundException =>
log.error("Driver class: " + driverClass + " not found")
System.exit(1)
case e: Exception =>
log.error("Exception: " + e.printStackTrace())
System.exit(1)
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().getOrCreate()
import spark.implicits._
val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()
val rc = gpTable.filter(gpTable("source_system_name")==="ORACLE" && gpTable("period_year")==="2017").count()
println("gpTable Count: " + rc)
}
}

现在,我正在获取行数以查看连接是成功还是失败。这是一个巨大的表,它运行速度较慢以获取我理解的计数,因为没有给出分区号和数据分区应该在其上发生的列名的参数。

在很多地方,我看到 jdbc 对象是通过以下方式创建的:

val gpTable2 = spark.read.jdbc(connectionUrl, tableName, connectionProperties) 

我使用 options 以另一种格式创建了它。当使用“选项”形成 jdbc 连接时,我无法理解如何给出 numPartitions,分区列名称,我希望在其上对数据进行分区:val gpTable = spark.read.format("jdbc") .option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()

谁能告诉我

  1. 如何添加参数:numPartitions、lowerBound、upperBound到这样写的jdbc对象:

    val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("密码", devPassword).load()

  2. 如何只添加 columnnamenumPartition 因为我想获取来自年份的所有行:2017,我不想要一个范围要选择的行数(lowerBound,upperBound)

最佳答案

选项 numPartitions、lowerBound、upperBound 和 PartitionColumn 控制 spark 中的并行读取。 PartitionColumn 需要一个完整的列。如果您的表中没有任何合适的列,那么您可以使用 ROW_NUMBER 作为您的分区列。

试一试,

val rowCount = spark.read.format("jdbc").option("url", connectionUrl)
   .option("dbtable","(select count(*) AS count * from tableName where source_system_name = "ORACLE" AND "period_year = "2017")")
.option("user",devUserName)
.option("password",devPassword)
.load()
.collect()
.map(row => row.getAs[Int]("count")).head

我们得到了为提供的谓词返回的行数,可以用作 upperBount。

val gpTable = spark.read.format("jdbc").option("url", connectionUrl)
.option("dbtable","(select ROW_NUMBER() OVER(ORDER BY (SELECT NULL)) AS RNO, * from tableName source_system_name = "ORACLE" AND "period_year = "2017")")
.option("user",devUserName)
.option("password",devPassword)
.option("numPartitions", 10)
.option("partitionColumn", "RNO")
.option("lowerBound", 1)
.option("upperBound", rowCount)
.load()

numPartitions 取决于与您的 Postgres 数据库的并行连接数。您可以根据从数据库中读取时所需的并行化来调整它。

关于apache-spark - spark-jdbc连接中如何操作numPartitions、lowerBound、upperBound?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51516822/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com