gpt4 book ai didi

Python,如何实现并行过程

转载 作者:行者123 更新时间:2023-11-29 06:15:47 25 4
gpt4 key购买 nike

在Python中,如果数据库很大,一个简单的select查询会耗费很多时间。我有一个包含 4,700,000 条记录的表,如果我使用 SELECT * FROM MY_TABLE 获取表中的所有数据,将需要 18 分钟。通过设置chunk_size并实现并行查询,可以节省时间。

所以,我的代码是:

import os
import time
import multiprocessing
import pandas as pd
import MySQLdb as mysql

if __name__ == '__main__':
conn = mysql.connect(host='192.168.0.114',
user='root',
passwd='fit123456',
db='A_stock_day',
charset='utf8'
)
limit = 100000
offset = 0
dfs = []
print 'start.....'
_s = time.time()
while True:
_query = 'SELECT * FROM A_stock_basic LIMIT %d OFFSET %d' %\
(limit, offset)
dfs.append(pd.read_sql(_query, conn))
offset += limit
if len(dfs[-1]) < limit:
break
_e = time.time()
print 'Time: ', _e - _s
full_df = pd.concat(dfs)

但是,它仍然需要大约 10 分钟。如何并行化,让多个线程同时运行,使执行时间缩短为一个线程的执行时间?我在这里有多处理代码:

def select(info):
""""""
limit, offset, conn = info[0], info[1], info[2]
_query = 'SELECT * FROM A_stock_basic LIMIT %d OFFSET %d' %\
(limit, offset)
s = time.time()
info[3].append(pd.read_sql(_query, conn))
e = time.time()
print 'time: ', e - s, ' pid: ', os.getpid()

if __name__ == '__main__':
conn = mysql.connect(host='192.168.0.114',
user='root',
passwd='fit123456',
db='A_stock_day',
charset='utf8'
)
dfs, p, pool= [], [], multiprocessing.Pool(7)
info = [(1000000, 0, conn, dfs),
(1000000, 1000000, conn, dfs),
(1000000, 2000000, conn, dfs),
(1000000, 3000000, conn, dfs),
(1000000, 4000000, conn, dfs),
(1000000, 5000000, conn, dfs),
(1000000, 6000000, conn, dfs),
]
for _i, _v in enumerate(info):
print 'start....', _i
_p = multiprocessing.Process(target=select, args=(_v, ))
_p.start()
_p.join()
print 'The End'

可以看到,虽然启动了multiprocessing,但是一次只有一个进程读取数据库。所以,这只是多处理,而不是并行处理。

如何实现并行多处理以节省时间?谢谢。

最佳答案

在你的循环中

for _i, _v in enumerate(info):
print 'start....', _i
_p = multiprocessing.Process(target=select, args=(_v, ))
_p.start()
_p.join()

您正在启动进程,然后立即加入它们。这意味着您的主流程永远不会启动多个额外的子流程(因为一旦启动一个子流程,它就会等待该子流程完成,然后再继续)。

解决这个问题最直接的方法是:

processes = []
for _i, _v in enumerate(info):
print 'start....', _i
_p = multiprocessing.Process(target=select, args=(_v, ))
_p.start()
processes.append(_p)
for _p in processes:
_p.join()

但是,更好的方法是使用您已经创建的 pool 对象。为此,代码应该类似于

pool.apply(select, info)

但是,我认为您更愿意让 select 返回它获取的数据(而不是将其附加到数组)并调用 pool.map 而不是 pool.apply。这应该有助于避免一些我认为您可能会遇到的竞争条件和共享内存问题。

您可以在 https://docs.python.org/2/library/multiprocessing.html 阅读有关这些功能的更多信息,尽管我希望您已经去过那里。

关于Python,如何实现并行过程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36028421/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com