- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试将大约 100 万行从 PostgreSQL 数据库加载到 Spark 中。使用 Spark 时大约需要 10 秒。但是,使用 psycopg2 驱动程序加载相同的查询需要 2 秒。我正在使用 postgresql jdbc 驱动程序版本 42.0.0
def _loadFromPostGres(name):
url_connect = "jdbc:postgresql:"+dbname
properties = {"user": "postgres", "password": "postgres"}
df = SparkSession.builder.getOrCreate().read.jdbc(url=url_connect, table=name, properties=properties)
return df
df = _loadFromPostGres("""
(SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298) as
user_series_game""")
print measure(lambda : len(df.collect()))
输出是-
--- 10.7214591503 seconds ---
1076131
使用 psycopg2 -
import psycopg2
conn = psycopg2.connect(conn_string)
cur = conn.cursor()
def _exec():
cur.execute("""(SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298)""")
return cur.fetchall()
print measure(lambda : len(_exec()))
cur.close()
conn.close()
输出是-
--- 2.27961301804 seconds ---
1076131
测量函数-
def measure(func) :
start_time = time.time()
x = func()
print("--- %s seconds ---" % (time.time() - start_time))
return x
请帮我找出这个问题的原因。
编辑 1
我做了更多的基准测试。使用 Scala 和 JDBC -
import java.sql._;
import scala.collection.mutable.ArrayBuffer;
def exec() {
val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+
"?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")
val conn = DriverManager.getConnection(url,"postgres","postgres");
val sqlText = """SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298"""
val t0 = System.nanoTime()
val stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
val rs = stmt.executeQuery()
val list = new ArrayBuffer[(Long, Long, Long, Double)]()
while (rs.next()) {
val seriesId = rs.getLong("seriesId")
val companyId = rs.getLong("companyId")
val userId = rs.getLong("userId")
val score = rs.getDouble("score")
list.append((seriesId, companyId, userId, score))
}
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")
println(list.size)
rs.close()
stmt.close()
conn.close()
}
exec()
输出是-
Elapsed time: 1.922102285s
1143402
当我在 Spark + Scala 中执行 collect() 时 -
import org.apache.spark.sql.SparkSession
def exec2() {
val spark = SparkSession.builder().getOrCreate()
val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+
"?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")
val sqlText = """(SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298) as user_series_game"""
val t0 = System.nanoTime()
val df = spark.read
.format("jdbc")
.option("url", url)
.option("dbtable", sqlText)
.option("user", "postgres")
.option("password", "postgres")
.load()
val list = df.collect()
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")
print (list.size)
}
exec2()
输出是
Elapsed time: 1.486141076s
1143445
因此在 Python 序列化中花费了 4 倍的额外时间。我知道会有一些惩罚,但这似乎太多了。
最佳答案
原因很简单,同时有两个原因。
首先,我将向您介绍 psycopg2
的工作原理。
这个库 psycopg2
像任何其他库一样连接到 RDMS。该库会将查询发送到您的 postgres 引擎,并将数据返回给您。像这样直接前进。
Conn -> Query -> ReturnData -> FetchData
当您使用 spark 时,有两种方式略有不同。 Spark 不像一种在单个线程中运行的编程语言。它有一个分布式系统来工作。即使你在本地机器上运行。请参阅 Spark 具有 Driver(Master) 和 Workers 的基本概念。
Driver 接收请求执行对 Postgres 的查询,Driver 不会请求每个 worker 从你的 Postgres 请求信息的数据。
如果您看到文档 here你会看到这样的注释:
Don’t create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems.
这个注释意味着每个工作人员将负责为您的 postgres 请求数据。这是开始这个过程的一个小开销,但没什么大不了的。但是这里有一个开销,将数据发送给每个工作人员。
第二点,你在这部分代码中的收集:
print measure(lambda : len(df.collect()))
collect 函数将发送一条命令,让您的所有工作人员将数据发送到您的驱动程序。要存储在驱动程序的内存中,它就像一个 Reduce,它会在进程中间创建 Shuffle。洗牌是将数据发送给其他工作人员的过程的步骤。在收集的情况下,每个 worker 都会将其发送给您的司机。
那么你代码的JDBC中Spark的步骤是:
(Workers)Conn -> (Workers)Query -> (Workers)FetchData -> (Driver) Request the Data -> (Workers) Shuffle -> (Driver) Collect
好吧,Spark 还发生了很多其他事情,比如 QueryPlan、构建 DataFrame 和其他事情。
这就是您在简单的 Python 代码中比 Spark 响应更快的原因。
关于postgresql - Spark 从 Postgres JDBC 表读取速度慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43533751/
我想知道这里是否有人有安装 Postgres-XL 的经验,新的开源多线程版本的 PostgreSQL。我计划将一组 1-2 TB 的数据库从常规 Postgres 9.3 迁移到 XL,并且想知道这
我想创建一个 postgres 备份脚本,但我不想使用 postgres 用户,因为我所在的 unix 系统几乎没有限制。我想要做的是在 crontab 上以 unix 系统(网络)的普通用户身份运行
我正在尝试编写一个 node-postgres 查询,它采用一个整数作为参数在间隔中使用: const query = { text: `SELECT foo
如何在不使用 gui 的情况下停止特定的 Postgres.app 集群。 我想使用 bash/Terminal.app 而不是 gui 我还应该指出,Postgres 应用程序有一个这样的菜单 如果
关闭。这个问题是opinion-based .它目前不接受答案。 想要改进这个问题? 更新问题,以便 editing this post 可以用事实和引用来回答它. 关闭 9 年前。 Improve
我正在使用 docker 运行 Postgres 图像。它曾经在 Windows10 和 Ubuntu 18.04 上运行没有任何问题。 在 Ubuntu 系统上重新克隆项目后,它在运行 docker
我正在使用 python(比如表 A)将批处理 csv 文件加载到 postgres。我正在使用 pandas 将数据上传到更快的 block 中。 for chunk in pd.read_csv(
所以是的,标题说明了一切,我需要以某种方式将 DB 从源服务器获取到新服务器,但更重要的是旧服务器正在崩溃 :P 有什么方法可以将它全部移动到新服务器并导入它? 旧服务器只是拒绝再运行 Postgre
这主要是出于好奇而提出的问题。我正在浏览 Postgres systemd 单元文件,以了解 systemd 可以做什么。 Postgres 有两个 systemd 单元文件。一个用于代替 syste
从我在 pg_hba.conf 中读到的内容,我推断,为了确保提示我输入 postgres 用户的密码,我应该从当前的“对等”编辑 pg_hba.conf 的前两个条目的方法'到'密码'或'md5',
我已连接到架构 apm。 尝试执行函数并出现以下错误: ERROR: user mapping not found for "postgres" 数据库连接信息说: apm on postgres@
我在 ubuntu 12.04 服务器上,我正在尝试安装 postgresql。截至目前,我已成功安装它但无法配置它。我需要创建一个角色才能继续前进,我在终端中运行了这个命令: root@hostna
我无法以“postgres”用户身份登录到“postgres”数据库。操作系统:REHL 服务器版本 6.3PostgreSQL 版本:8.4有一个数据库“jiradb”用作 JIRA 6.0.8 的
我正在尝试将现有数据库导入 postgres docker 容器。 这就是我的处理方式: docker run --name pg-docker -e POSTGRES_PASSWORD=*****
我们的 Web 应用程序在 postgres 9.3 和 Grails 2.5.3 上运行。当我们重新启动 postgres (/etc/init.d/postgresql restart) 并访问网
我想构建 postgres docker 容器来测试一些问题。我有: postgres 文件的归档文件夹(/var/lib/postgres/data/) 将文件夹放入 docker postgres
我有一个名为“stuff”的表,其中有一个名为“tags”的 json 列,用于存储标签列表,还有一个名为“id”的列,它是表中每一行的主键。我正在使用 postgres 数据库。例如,一行看起来像这
我对 sqlalchemy-psql 中的锁定机制是如何工作的感到非常困惑。我正在运行一个带有 sqlalchemy 和 postgres 的 python-flask 应用程序。由于我有多个线程处理
我(必须)使用 Postgres 8.4 数据库。在这个数据库中,我创建了一个函数: CREATE OR REPLACE FUNCTION counter (mindate timestamptz,m
我已经使用 PostgreSQL 几天了,它运行良好。我一直在通过默认的 postgres 数据库用户和另一个具有权限的用户使用它。 今天中午(在一切正常之后)它停止工作,我再也无法回到数据库中。我会
我是一名优秀的程序员,十分优秀!