gpt4 book ai didi

multithreading - 使用 Python 和多线程处理巨大的 CSV 文件

转载 作者:行者123 更新时间:2023-12-04 23:14:55 25 4
gpt4 key购买 nike

我有一个函数可以懒惰地从一个巨大的 CSV 文件中生成行:

def get_next_line():
with open(sample_csv,'r') as f:
for line in f:
yield line

def do_long_operation(row):
print('Do some operation that takes a long time')

我需要使用线程,以便从上述函数中获得的每条记录都可以调用 do_long_operation

互联网上的大多数地方都有这样的例子,我不太确定我是否走在正确的道路上。
import threading
thread_list = []
for i in range(8):
t = threading.Thread(target=do_long_operation, args=(get_next_row from get_next_line))
thread_list.append(t)

for thread in thread_list:
thread.start()

for thread in thread_list:
thread.join()

我的问题是:
  • 如何只启动有限数量的线程,比如 8 个?
  • 如何确保每个线程都从 get_next_line 获得一行?
  • 最佳答案

    您可以使用 multiprocessing 中的线程池并将您的任务映射到工作线程池:

    from multiprocessing.pool import ThreadPool as Pool
    # from multiprocessing import Pool
    from random import randint
    from time import sleep


    def process_line(l):
    print l, "started"
    sleep(randint(0, 3))
    print l, "done"


    def get_next_line():
    with open("sample.csv", 'r') as f:
    for line in f:
    yield line

    f = get_next_line()

    t = Pool(processes=8)

    for i in f:
    t.map(process_line, (i,))
    t.close()
    t.join()

    这将创建八名工作人员并将您的行一一提交给他们。一旦进程“空闲”,它就会被分配一个新任务。

    还有一个注释掉的导入语句。如果您注释掉 ThreadPool 并从 multiprocessing 导入 o​​jit_code,您将获得子进程而不是线程,这在您的情况下可能更有效。

    关于multithreading - 使用 Python 和多线程处理巨大的 CSV 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44950893/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com