gpt4 book ai didi

python - 如何并行对 pandas 数据帧执行多个 SQL 查询

转载 作者:太空宇宙 更新时间:2023-11-03 11:31:45 24 4
gpt4 key购买 nike

大家好,Python Pandas 大师们。我正在寻找一种方法来与 Python 并行运行一些 SQL,返回几个 Pandas 数据帧。我有类似于下面的代码,它针对 MS SQL 服务器数据库连续运行 4 个 SQL 查询。与 IO(网络)时间相比,其中两个查询的执行时间要长得多才能获得结果,因此我认为并行化会使代码运行速度提高约 2 倍。是否有一种简单的方法来并行执行查询?

理想情况下,我希望能够读取项目子目录中的所有 *.sql 文件,然后触发并行运行的查询并以易于使用的格式(列表? ) 进行进一步的操作(索引、加入、聚合)。

提前致谢,兰德尔

# imports
import ceODBC
import numpy as np
import pandas as pd
import pandas.io.sql as psql
from ConfigParser import ConfigParser
import os
import glob

# db connection string
cnxn = 'DRIVER={SQL Server Native Client 11.0}; SERVER=<servername>; DATABASE=<dname>; Trusted_Connection=Yes'

# directories (also should be moved to config)
dataDir = os.getcwd() + '\\data\\'
sqlDir = os.getcwd() + '\\sql\\'

# read sql from external .sql files. Possible to read all *.sql files in a sql dir into a list (or other structure...)?
with open(sqlDir + 'q1.sql', 'r') as f: q1sql = f.read()
with open(sqlDir + 'q2.sql', 'r') as f: q2sql = f.read()
with open(sqlDir + 'q3.sql', 'r') as f: q3sql = f.read()
with open(sqlDir + 'q4.sql', 'r') as f: q4sql = f.read()

# Connect to db, run SQL, assign result into dataframe, close connection.
cnxn = ceODBC.connect(cnxn)
cursor = cnxn.cursor()

# execute the queries and close the connection. Parallelize?
df1 = psql.frame_query(q1sql, cnxn)
df2 = psql.frame_query(q2sql, cnxn)
df3 = psql.frame_query(q3sql, cnxn)
df4 = psql.frame_query(q4sql, cnxn)

# close connection
cnxn.close()

最佳答案

在 N 个线程中使用 N 个连接。然后加入 theads 并处理结果。

# imports
import ceODBC
import numpy as np
import pandas as pd
import pandas.io.sql as psql
from ConfigParser import ConfigParser
import os
import glob
import threading
enter code here


# db connection string
cnxn_string = 'DRIVER={SQL Server Native Client 11.0}; SERVER=<servername>; DATABASE=<dname>; Trusted_Connection=Yes'

# directories (also should be moved to config)
dataDir = os.getcwd() + '\\data\\'
sqlDir = os.getcwd() + '\\sql\\'

#variable to store results
responses={}
responses_lock=threading.Lock()

maxconnections = 8
pool_sema = BoundedSemaphore(value=maxconnections)


def task(fname):

with open(fname, 'r') as f: sql = f.read()

# Connect to db, run SQL, assign result into dataframe, close connection.
# to limit connections on DB used semaphore
pool_sema.acquire()
cnxn = ceODBC.connect(cnxn_string)
cursor = cnxn.cursor()
# execute the queries and close the connection. Parallelize?
df = psql.frame_query(sql, cnxn)
# close connection
cnxn.close()
pool_sema.release()

# to ensure that only one thread can modify global variable
responses_lock.acquire()
responses[fname] = df
responses_lock.release()


pool = []

#find sql files and spawn theads
for fname im glob.glob( os.path.join(sqlDir,'*sql')):
#create new thread with task
thread = threading.Thread(target=task,args=(fname,))
thread.daemon = True
# store thread in pool
pool.append(thread)
#thread started
thread.start()

#wait for all threads tasks done
for thread in pool:
thread.join()

# results of each execution stored in responses dict

每个文件在单独的线程中执行。结果存储在一个变量中。

等同于带有with语句的函数:

def task(fname):

with open(fname, 'r') as f: sql = f.read()

# Connect to db, run SQL, assign result into dataframe, close connection.
# to limit connections on DB used semaphore
with pool_sema:
cnxn = ceODBC.connect(cnxn_string)
cursor = cnxn.cursor()
# execute the queries and close the connection. Parallelize?
df = psql.frame_query(sql, cnxn)
# close connection
cnxn.close()


# to ensure that only one thread can modify global variable
with responses_lock:
responses[fname] = df

multiprocessing.Pool 很容易分发繁重的任务,但它本身有更多的 IO 操作。

关于python - 如何并行对 pandas 数据帧执行多个 SQL 查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17913754/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com