gpt4 book ai didi

postgresql - Spark 从 Postgres JDBC 表读取速度慢

转载 作者:行者123 更新时间:2023-11-29 11:25:32 25 4
gpt4 key购买 nike

我正在尝试将大约 100 万行从 PostgreSQL 数据库加载到 Spark 中。使用 Spark 时大约需要 10 秒。但是,使用 psycopg2 驱动程序加载相同的查询需要 2 秒。我正在使用 postgresql jdbc 驱动程序版本 42.0.0

def _loadFromPostGres(name):
url_connect = "jdbc:postgresql:"+dbname
properties = {"user": "postgres", "password": "postgres"}
df = SparkSession.builder.getOrCreate().read.jdbc(url=url_connect, table=name, properties=properties)
return df

df = _loadFromPostGres("""
(SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298) as
user_series_game""")

print measure(lambda : len(df.collect()))

输出是-

--- 10.7214591503 seconds ---
1076131

使用 psycopg2 -

import psycopg2
conn = psycopg2.connect(conn_string)
cur = conn.cursor()

def _exec():
cur.execute("""(SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298)""")
return cur.fetchall()
print measure(lambda : len(_exec()))
cur.close()
conn.close()

输出是-

--- 2.27961301804 seconds ---
1076131

测量函数-

def measure(func) :
start_time = time.time()
x = func()
print("--- %s seconds ---" % (time.time() - start_time))
return x

请帮我找出这个问题的原因。


编辑 1

我做了更多的基准测试。使用 Scala 和 JDBC -

import java.sql._;
import scala.collection.mutable.ArrayBuffer;

def exec() {

val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+
"?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")

val conn = DriverManager.getConnection(url,"postgres","postgres");

val sqlText = """SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298"""

val t0 = System.nanoTime()

val stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)

val rs = stmt.executeQuery()

val list = new ArrayBuffer[(Long, Long, Long, Double)]()

while (rs.next()) {
val seriesId = rs.getLong("seriesId")
val companyId = rs.getLong("companyId")
val userId = rs.getLong("userId")
val score = rs.getDouble("score")
list.append((seriesId, companyId, userId, score))
}

val t1 = System.nanoTime()

println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")

println(list.size)

rs.close()
stmt.close()
conn.close()
}

exec()

输出是-

Elapsed time: 1.922102285s
1143402

当我在 Spark + Scala 中执行 collect() 时 -

import org.apache.spark.sql.SparkSession

def exec2() {

val spark = SparkSession.builder().getOrCreate()

val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+
"?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")

val sqlText = """(SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298) as user_series_game"""

val t0 = System.nanoTime()

val df = spark.read
.format("jdbc")
.option("url", url)
.option("dbtable", sqlText)
.option("user", "postgres")
.option("password", "postgres")
.load()

val list = df.collect()

val t1 = System.nanoTime()

println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")

print (list.size)
}

exec2()

输出是

Elapsed time: 1.486141076s
1143445

因此在 Python 序列化中花费了 4 倍的额外时间。我知道会有一些惩罚,但这似乎太多了。

最佳答案

原因很简单,同时有两个原因。

首先,我将向您介绍 psycopg2 的工作原理。

这个库 psycopg2 像任何其他库一样连接到 RDMS。该库会将查询发送到您的 postgres 引擎,并将数据返回给您。像这样直接前进。

Conn -> Query -> ReturnData -> FetchData

当您使用 spark 时,有两种方式略有不同。 Spark 不像一种在单个线程中运行的编程语言。它有一个分布式系统来工作。即使你在本地机器上运行。请参阅 Spark 具有 Driver(Master) 和 Workers 的基本概念。

Driver 接收请求执行对 Postgres 的查询,Driver 不会请求每个 worker 从你的 Postgres 请求信息的数据。

如果您看到文档 here你会看到这样的注释:

Don’t create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems.

这个注释意味着每个工作人员将负责为您的 postgres 请求数据。这是开始这个​​过程的一个小开销,但没什么大不了的。但是这里有一个开销,将数据发送给每个工作人员。

第二点,你在这部分代码中的收集:

print measure(lambda : len(df.collect()))

collect 函数将发送一条命令,让您的所有工作人员将数据发送到您的驱动程序。要存储在驱动程序的内存中,它就像一个 Reduce,它会在进程中间创建 Shuffle。洗牌是将数据发送给其他工作人员的过程的步骤。在收集的情况下,每个 worker 都会将其发送给您的司机。

那么你代码的JDBC中Spark的步骤是:

(Workers)Conn -> (Workers)Query -> (Workers)FetchData -> (Driver) Request the Data -> (Workers) Shuffle -> (Driver) Collect

好吧,Spark 还发生了很多其他事情,比如 QueryPlan、构建 DataFrame 和其他事情。

这就是您在简单的 Python 代码中比 Spark 响应更快的原因。

关于postgresql - Spark 从 Postgres JDBC 表读取速度慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43533751/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com