gpt4 book ai didi

postgresql - 如何加速 spark df.write jdbc 到 postgres 数据库?

转载 作者:行者123 更新时间:2023-12-03 14:57:48 25 4
gpt4 key购买 nike

我是 spark 新手,正在尝试使用 df.write 加快将数据帧的内容(可能有 200k 到 2M 行)附加到 postgres 数据库的速度:

df.write.format('jdbc').options(
url=psql_url_spark,
driver=spark_env['PSQL_DRIVER'],
dbtable="{schema}.{table}".format(schema=schema, table=table),
user=spark_env['PSQL_USER'],
password=spark_env['PSQL_PASS'],
batchsize=2000000,
queryTimeout=690
).mode(mode).save()

我尝试增加批量,但这没有帮助,因为完成这项任务仍然需要大约 4 小时。我还在下面包含了来自 aws emr 的一些快照,其中显示了有关作业运行方式的更多详细信息。将数据帧保存到 postgres 表的任务只分配给了一个执行器(我觉得很奇怪),加速这个任务是否涉及在执行器之间分配这个任务?

另外,我已经阅读了 spark's performance tuning docs但增加 batchsize , 和 queryTimeout似乎没有提高性能。 (我尝试在 df.cache() 之前在我的脚本中调用 df.write ,但脚本的运行时间仍然是 4 小时)

此外,我的 aws emr 硬件设置和 spark-submit是:

主节点(1):m4.xlarge

核心节点(2):m5.xlarge
spark-submit --deploy-mode client --executor-cores 4 --num-executors 4 ...

enter image description here

enter image description here

最佳答案

Spark 是分布式数据处理引擎,因此当您处理数据或将其保存在文件系统上时,它会使用其所有执行程序来执行任务。
Spark JDBC 很慢,因为当您建立 JDBC 连接时,其中一个执行程序会建立到目标数据库的链接,从而导致速度缓慢和失败。

为了克服这个问题并加快向数据库写入数据的速度,您需要使用以下方法之一:

方法一:

在这种方法中,您需要使用 postgres COPY 命令实用程序 以加快写操作。这需要您拥有 psycopg2 您的 EMR 集群上的库。

COPY 实用程序的文档是 here

如果您想了解基准差异以及为什么复制更快,请访问 here !

Postgres 还建议使用 COPY 命令进行批量插入。现在如何批量插入 Spark 数据帧。
现在要实现更快的写入,首先将您的 spark 数据帧以 csv 格式保存到 EMR 文件系统,并重新分区您的输出,以便没有文件包含超过 10 万行。

#Repartition your dataframe dynamically based on number of rows in df
df.repartition(10).write.option("maxRecordsPerFile", 100000).mode("overwrite").csv("path/to/save/data)

现在使用 python 读取文件并对每个文件执行复制命令。
import psycopg2    
#iterate over your files here and generate file object you can also get files list using os module
file = open('path/to/save/data/part-00000_0.csv')
file1 = open('path/to/save/data/part-00000_1.csv')

#define a function
def execute_copy(fileName):
con = psycopg2.connect(database=dbname,user=user,password=password,host=host,port=port)
cursor = con.cursor()
cursor.copy_from(fileName, 'table_name', sep=",")
con.commit()
con.close()

为了获得额外的速度提升,由于您使用的是 EMR 集群,您可以利用 python 多处理一次复制多个文件。
from multiprocessing import Pool, cpu_count
with Pool(cpu_count()) as p:
print(p.map(execute_copy, [file,file1]))

这是推荐的方法,因为由于连接限制,无法调整 spark JDBC 以获得更高的写入速度。

方法二:
由于您已经在使用 AWS EMR 集群,因此您始终可以利用 hadoop 功能更快地执行表写入。
所以在这里我们将使用 sqoop export 将我们的数据从 emrfs 导出到 postgres 数据库。
#If you are using s3 as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir s3://mybucket/myinputfiles/ --driver org.postgresql.Driver --username master --password password --input-null-string '\\N' --input-null-non-string '\\N' --direct -m 16

#If you are using EMRFS as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir /path/to/save/data/ --driver org.postgresql.Driver --username master --password password --input-null-string '\\N' --input-null-non-string '\\N' --direct -m 16


为什么是sqoop?
因为sqoop根据指定的mapper数量打开与数据库的多个连接。因此,如果您将 -m 指定为 8,那么将有 8 个并发连接流,这些流会将数据写入 postgres。

此外,有关使用 sqoop 的更多信息,请访问此 AWS Blog , SQOOP ConsiderationsSQOOP Documentation .

如果您可以使用代码进行破解,那么方法 1 肯定会给您带来您所寻求的性能提升,如果您对 SQOOP 等 hadoop 组件感到满意,那么请使用第二种方法。

希望能帮助到你!

关于postgresql - 如何加速 spark df.write jdbc 到 postgres 数据库?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58676909/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com