gpt4 book ai didi

Python:如何将外部队列与 ProcessPoolExecutor 一起使用?

转载 作者:行者123 更新时间:2023-11-28 17:40:01 25 4
gpt4 key购买 nike

我最近开始使用 Python 的多线程和多处理功能。

我尝试编写代码,使用生产者/消费者方法从 JSON 日志文件中读取 block ,将这些 block 作为事件写入队列,然后启动一组将从该队列中轮询事件的进程(文件 block )并处理每一个,打印出结果。

我的意图是首先启动进程,让它们等待事件开始进入队列。

我目前正在使用这段代码,它似乎可以工作,使用我发现的示例中的一些点点滴滴:

import re, sys
from multiprocessing import Process, Queue

def process(file, chunk):
f = open(file, "rb")
f.seek(chunk[0])
for entry in pat.findall(f.read(chunk[1])):
print(entry)

def getchunks(file, size=1024*1024):
f = open(file, "rb")
while True:
start = f.tell()
f.seek(size, 1)
s = f.readline() # skip forward to next line ending
yield start, f.tell() - start
if not s:
break

def processingChunks(queue):
while True:
queueEvent = queue.get()
if (queueEvent == None):
queue.put(None)
break
process(queueEvent[0], queueEvent[1])

if __name__ == "__main__":
testFile = "testFile.json"
pat = re.compile(r".*?\n")
queue = Queue()

for w in xrange(6):
p = Process(target=processingChunks, args=(queue,))
p.start()

for chunk in getchunks(testFile):
queue.put((testFile, chunk))
print(queue.qsize())
queue.put(None)

但是,我想学习如何使用 concurrent.futures ProcessPoolExecutor 以异步方式使用 Future 结果对象实现相同的结果。

我的第一次尝试是使用一个外部队列,它是用多处理管理器创建的,我会将其传递给进程进行轮询。

但是这似乎不起作用,我认为这可能不是 ProcessPoolExecutor 设计的工作方式,因为它似乎使用它自己的内部队列。

我使用了这段代码:

import concurrent
from concurrent.futures import as_completed
import re, sys
from multiprocessing import Lock, Process, Queue, current_process, Pool, Manager

def process(file, chunk):
entries = []
f = open(file, "rb")
f.seek(chunk[0])
for entry in pat.findall(f.read(chunk[1])):
entries.append(entry)
return entries

def getchunks(file, size=1024*1024):
f = open(file, "rb")
while True:
start = f.tell()
f.seek(size, 1)
s = f.readline() # skip forward to next line ending
yield start, f.tell() - start
if not s:
break

def processingChunks(queue):
while True:
queueEvent = queue.get()
if (queueEvent == None):
queue.put(None)
break
return process(queueEvent[0], queueEvent[1])

if __name__ == "__main__":
testFile = "testFile.json"
pat = re.compile(r".*?\n")
procManager = Manager()
queue = procManager.Queue()

with concurrent.futures.ProcessPoolExecutor(max_workers = 6) as executor:
futureResults = []
for i in range(6):
future_result = executor.submit(processingChunks, queue)
futureResults.append(future_result)

for complete in as_completed(futureResults):
res = complete.result()
for i in res:
print(i)


for chunk in getchunks(testFile):
queue.put((testFile, chunk))
print(queue.qsize())
queue.put(None)

我无法用这个获得任何结果,所以显然我做错了什么,而且我不理解这个概念。

你们能帮我理解如何实现吗?

最佳答案

感谢 Blckknght,他的回复将我推向了正确的方向。这是我最初问题的可能解决方案:

#!/usr/bin/python
import concurrent
from concurrent.futures import as_completed
import re, sys

def process(event):
entries = []
fl = event[0]
chunk = event[1]
pat = event[2]
f = open(fl, "rb")
f.seek(chunk[0])
for entry in pat.findall(f.read(chunk[1])):
entries.append(entry)
return entries

def getchunks(file, pat, size=1024*1024):
f = open(file, "rb")
while True:
start = f.tell()
f.seek(size, 1)
s = f.readline() # skip forward to next line ending
yield (file, (start, f.tell() - start), pat)
if not s:
break

if __name__ == "__main__":
testFile = "testFile.json"
pat = re.compile(r".*?\n")
results = []

with concurrent.futures.ProcessPoolExecutor() as executor:
for res in (executor.submit(process, event) for event in getchunks(testFile, pat)):
results.append(res)

for complete in as_completed(results):
for entry in complete.result():
print('Event result: %s' % entry)

关于Python:如何将外部队列与 ProcessPoolExecutor 一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25763627/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com