gpt4 book ai didi

python - 使用多处理锁定Python写入文件时丢失行

转载 作者:太空宇宙 更新时间:2023-11-03 16:19:27 28 4
gpt4 key购买 nike

这是我的代码:

from multiprocessing import Pool, Lock
from datetime import datetime as dt

console_out = "/STDOUT/Console.out"
chunksize = 50
lock = Lock()

def writer(message):
lock.acquire()
with open(console_out, 'a') as out:
out.write(message)
out.flush()
lock.release()

def conf_wrapper(state):
import ProcessingModule as procs
import sqlalchemy as sal

stcd, nrows = state
engine = sal.create_engine('postgresql://foo:bar@localhost:5432/schema')

writer("State {s} started at: {n}"
"\n".format(s=str(stcd).zfill(2), n=dt.now()))

with engine.connect() as conn, conn.begin():
procs.processor(conn, stcd, nrows, chunksize)

writer("\tState {s} finished at: {n}"
"\n".format(s=str(stcd).zfill(2), n=dt.now()))

def main():
nprocesses = 12
maxproc = 1
state_list = [(2, 113), (10, 119), (15, 84), (50, 112), (44, 110), (11, 37), (33, 197)]

with open(console_out, 'w') as out:
out.write("Starting at {n}\n".format(n=dt.now()))
out.write("Using {p} processes..."
"\n".format(p=nprocesses))

with Pool(processes=int(nprocesses), maxtasksperchild=maxproc) as pool:
pool.map(func=conf_wrapper, iterable=state_list, chunksize=1)

with open(console_out, 'a') as out:
out.write("\nAll done at {n}".format(n=dt.now()))

文件 console_out 中永远不会包含所有 7 个状态。它总是会错过一个或多个状态。这是最新运行的输出:

Starting at 2016-07-27 21:46:58.638587
Using 12 processes...
State 44 started at: 2016-07-27 21:47:01.482322
State 02 started at: 2016-07-27 21:47:01.497947
State 11 started at: 2016-07-27 21:47:01.529198
State 10 started at: 2016-07-27 21:47:01.497947
State 11 finished at: 2016-07-27 21:47:15.701207
State 15 finished at: 2016-07-27 21:47:24.123164
State 44 finished at: 2016-07-27 21:47:32.029489
State 50 finished at: 2016-07-27 21:47:51.203107
State 10 finished at: 2016-07-27 21:47:53.046876
State 33 finished at: 2016-07-27 21:47:58.156301
State 02 finished at: 2016-07-27 21:48:18.856979

All done at 2016-07-27 21:48:18.992277

为什么?

注意,操作系统是Windows Server 2012 R2。

最佳答案

由于您在 Windows 上运行,因此工作进程不会继承任何内容。每个进程“从头开始”运行整个主程序。

特别是,根据编写的代码,每个进程都有自己的lock实例,并且这些实例彼此无关。简而言之,lock 根本不提供任何进程间互斥。

要解决此问题,可以将 Pool 构造函数更改为调用每个进程一次的初始化函数,并向该函数传递 Lock() 的实例。例如,像这样:

def init(L):
global lock
lock = L

然后将这些参数添加到 Pool() 构造函数中:

initializer=init, initargs=(Lock(),),

您不再需要:

lock = Lock()

线。

然后进程间互斥将按预期工作。

没有锁

如果您想将所有输出委托(delegate)给编写器进程,您可以跳过锁定并使用队列来代替该进程[稍后会看到不同的版本]。

def writer_process(q):
with open(console_out, 'w') as out:
while True:
message = q.get()
if message is None:
break
out.write(message)
out.flush() # can't guess whether you really want this

并将 writer() 更改为:

def writer(message):
q.put(message)

您将再次需要在 Pool 构造函数中使用 initializer=initargs= ,以便所有进程都使用相同 队列。

只有一个进程应该运行 writer_process(),并且可以作为 multiprocessing.Process 的实例自行启动。

最后,让 writer_process() 知道是时候退出了,当它排空队列并返回时,只需运行

q.put(None)

在主进程中。

稍后

OP 选择了这个版本,因为他们需要同时在其他代码中打开输出文件:

def writer_process(q):
while True:
message = q.get()
if message == 'done':
break
else:
with open(console_out, 'a') as out:
out.write(message)

我不知道为什么终止哨兵被更改为“done”。任何独特的值都适用于此; 没有是传统的。

关于python - 使用多处理锁定Python写入文件时丢失行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38626817/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com