gpt4 book ai didi

Python:将 sqlite3 与多处理一起使用

转载 作者:IT王子 更新时间:2023-10-29 06:28:53 47 4
gpt4 key购买 nike

我有一个 SQLite3 数据库。我需要解析 10000 个文件。我从每个文件中读取一些数据,然后用这些数据查询数据库以获得结果。我的代码在单进程环境中运行良好。但是在尝试使用多处理池时出现错误。

My approach without multiprocessing (works OK):
1. Open DB connection object
2. for f in files:
foo(f, x1=x1, x2=x2, ..., db=DB)
3. Close DB

My approach with multiprocessing (does NOT work):
1. Open DB
2. pool = multiprocessing.Pool(processes=4)
3. pool.map(functools.partial(foo, x1=x1, x2=x2, ..., db=DB), [files])
4. pool.close()
5. Close DB

我收到以下错误:sqlite3.ProgrammingError: Base Cursor.__init__ not called。

我的数据库类实现如下:

def open_db(sqlite_file):
"""Open SQLite database connection.

Args:
sqlite_file -- File path

Return:
Connection
"""

log.info('Open SQLite database %s', sqlite_file)
try:
conn = sqlite3.connect(sqlite_file)
except sqlite3.Error, e:
log.error('Unable to open SQLite database %s', e.args[0])
sys.exit(1)

return conn

def close_db(conn, sqlite_file):
"""Close SQLite database connection.

Args:
conn -- Connection
"""

if conn:
log.info('Close SQLite database %s', sqlite_file)
conn.close()

class MapDB:

def __init__(self, sqlite_file):
"""Initialize.

Args:
sqlite_file -- File path
"""

# 1. Open database.
# 2. Setup to receive data as dict().
# 3. Get cursor to execute queries.
self._sqlite_file = sqlite_file
self._conn = open_db(sqlite_file)
self._conn.row_factory = sqlite3.Row
self._cursor = self._conn.cursor()

def close(self):
"""Close DB connection."""

if self._cursor:
self._cursor.close()
close_db(self._conn, self._sqlite_file)

def check(self):
...

def get_driver_net(self, net):
...

def get_cell_id(self, net):
...

函数 foo() 如下所示:

def foo(f, x1, x2, db):

extract some data from file f
r1 = db.get_driver_net(...)
r2 = db.get_cell_id(...)

整体无效的实现如下:

mapdb = MapDB(sqlite_file)

log.info('Create NetInfo objects')
pool = multiprocessing.Pool(processes=4)
files = [get list of files to process]
pool.map(functools.partial(foo, x1=x1, x2=x2, db=mapdb), files)
pool.close()
mapdb.close()

为了解决这个问题,我想我需要在每个池工作器中创建 MapDB() 对象(因此有 4 个并行/独立连接)。但我不确定该怎么做。谁能告诉我如何使用 Pool 完成此操作的示例?

最佳答案

像这样定义 foo 怎么样:

def foo(f, x1, x2, db_path):
mapdb = MapDB(db_path)
... open mapdb
... process data ...
... close mapdb

然后将您的 pool.map 调用更改为:

pool.map(functools.partial(foo, x1=x1, x2=x2, db_path="path-to-sqlite3-db"), files)    

更新

另一种选择是自己处理工作线程并通过 Queue 分配工作。

from Queue import Queue
from threading import Thread

q = Queue()

def worker():
mapdb = ...open the sqlite database
while True:
item = q.get()
if item[0] == "file":
file = item[1]
... process file ...
q.task_done()
else:
q.task_done()
break
...close sqlite connection...

# Start up the workers

nworkers = 4

for i in range(nworkers):
worker = Thread(target=worker)
worker.daemon = True
worker.start()

# Place work on the Queue

for x in ...list of files...:
q.put(("file",x))

# Place termination tokens onto the Queue

for i in range(nworkers):
q.put(("end",))

# Wait for all work to be done.

q.join()

终止 token 用于确保关闭 sqlite 连接 - 以防万一。

关于Python:将 sqlite3 与多处理一起使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37199897/

47 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com