gpt4 book ai didi

python - 仅当有免费 worker 可用时如何生成 future

转载 作者:太空宇宙 更新时间:2023-11-03 18:49:09 25 4
gpt4 key购买 nike

我正在尝试将从大文件行中提取的信息发送到某个服务器上运行的进程。

为了加快速度,我想使用一些并行线程来执行此操作。

使用 concurrent.futures 的 Python 2.7 反向移植我试过这个:

f = open("big_file")
with ThreadPoolExecutor(max_workers=4) as e:
for line in f:
e.submit(send_line_function, line)
f.close()

但是,这是有问题的,因为所有 future 都会立即提交,因此我的机器会耗尽内存,因为完整的文件会加载到内存中。

我的问题是,是否有一种简单的方法可以仅在有免费 worker 可用时提交新的 future。

最佳答案

您可以使用迭代文件 block

for chunk in zip(*[f]*chunksize):

(这是 grouper recipe 的一个应用程序,它将迭代器 f 中的项目收集到大小为 chunksize 的组中。注意:这不会消耗整个文件因为 zip 在 Python3 中返回一个迭代器。)

<小时/>
import concurrent.futures as CF
import itertools as IT
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')

def worker(line):
line = line.strip()
logger.info(line)

chunksize = 1024
with CF.ThreadPoolExecutor(max_workers=4) as executor, open("big_file") as f:
for chunk in zip(*[f]*chunksize):
futures = [executor.submit(worker, line) for line in chunk]
# wait for these futures to complete before processing another chunk
CF.wait(futures)
<小时/>

现在,您在评论中正确地指出这不是最佳选择。可能有一些 worker 需要很长时间,并且占据了整个工作岗位。

通常,如果每次调用工作程序花费的时间大致相同,那么这并不是什么大问题。然而,这里有一种按需推进文件句柄的方法。它使用 threading.Condition 通知 sprinkler 推进文件句柄。

import logging
import threading
import Queue

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
SENTINEL = object()

def worker(cond, queue):
for line in iter(queue.get, SENTINEL):
line = line.strip()
logger.info(line)
with cond:
cond.notify()
logger.info('notify')

def sprinkler(cond, queue, num_workers):
with open("big_file") as f:
for line in f:
logger.info('advancing filehandle')
with cond:
queue.put(line)
logger.info('waiting')
cond.wait()
for _ in range(num_workers):
queue.put(SENTINEL)

num_workers = 4
cond = threading.Condition()
queue = Queue.Queue()
t = threading.Thread(target=sprinkler, args=[cond, queue, num_workers])
t.start()

threads = [threading.Thread(target=worker, args=[cond, queue])]
for t in threads:
t.start()
for t in threads:
t.join()

关于python - 仅当有免费 worker 可用时如何生成 future ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18770534/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com