我是 Python 和机器学习的初学者。我正在尝试使用多线程重现 countvectorizer()
的代码。我正在使用 yelp 数据集使用 LogisticRegression
进行情绪分析。这是我到目前为止写的:
代码片段:
from multiprocessing.dummy import Pool as ThreadPool
from threading import Thread, current_thread
from functools import partial
data = df['text']
rev = df['stars']
y = []
def product_helper(args):
return featureExtraction(*args)
def featureExtraction(p,t):
temp = [0] * len(bag_of_words)
for word in p.split():
if word in bag_of_words:
temp[bag_of_words.index(word)] += 1
return temp
# function to be mapped over
def calculateParallel(threads):
pool = ThreadPool(threads)
job_args = [(item_a, rev[i]) for i, item_a in enumerate(data)]
l = pool.map(product_helper,job_args)
pool.close()
pool.join()
return l
temp_X = calculateParallel(12)
这里只是部分代码。
解释:
df['text']
包含所有评论,df['stars']
包含评级(1 到 5)。我正在尝试使用多线程查找单词计数向量 temp_X
。 bag_of_words
是一些常用词的列表。
问题:
在没有多线程的情况下,我能够在大约 24 分钟内计算出 temp_X
,而上面的代码对于大小为 100k 的评论的数据集花费了 33 分钟。我的机器有 128GB 的 DRAM 和 12 个内核(6 个具有超线程的物理内核,即每个内核的线程数 = 2)。
我在这里做错了什么?
你的整个代码似乎是 CPU Bound
而不是 IO Bound
。你只是在使用 GIL
下的 threads
> 如此有效地运行一个线程加上开销。它只在一个内核上运行。要在多个内核上运行,请使用
使用
import multiprocessing
pool = multiprocessing.Pool()
l = pool.map_async(product_helper,job_args)
from multiprocessing.dummy import Pool as ThreadPool 只是 thread
模块的包装器。它仅使用 一个核心
,仅此而已。
我是一名优秀的程序员,十分优秀!