gpt4 book ai didi

python - 如何在使用 Keras 的深度学习中利用多处理和多线程?

转载 作者:太空狗 更新时间:2023-10-29 22:24:12 25 4
gpt4 key购买 nike

我假设大多数框架(如 keras/tensorflow/...)会自动使用所有 CPU 内核,但实际上它们似乎并非如此。我只能找到很少的资源可以引导我们在深度学习过程中使用 CPU 的全部容量。我找到了一个 article这是关于

用法的
from multiprocessing import Pool 
import psutil
import ray

另一方面,基于这个answer对于在多个进程中使用 keras 模型,没有跟踪上述库。是否有更优雅的方式来利用 Keras 的Multiprocessing,因为它的实现非常受欢迎。

  • 例如,如何修改以下简单的 RNN 实现以在学习过程中达到至少 50% 的 CPU 容量?

  • 我应该使用第二个模型作为多任务处理,就像我在下面评论的 LSTM 一样吗?我的意思是我们可以通过使用更多的 CPU 容量来同时运行多个模型吗?

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from keras.layers.normalization import BatchNormalization
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import LSTM,SimpleRNN
from keras.models import Sequential
from keras.optimizers import Adam, RMSprop

df = pd.read_csv("D:\Train.csv", header=None)

index = [i for i in list(range(1440)) if i%3==2]

Y_train= df[index]
df = df.values

#making history by using look-back to prediction next
def create_dataset(dataset,data_train,look_back=1):
dataX,dataY = [],[]
print("Len:",len(dataset)-look_back-1)
for i in range(len(dataset)-look_back-1):
a = dataset[i:(i+look_back), :]
dataX.append(a)
dataY.append(data_train[i + look_back, :])
return np.array(dataX), np.array(dataY)

Y_train=np.array(Y_train)
df=np.array(df)

look_back = 10
trainX,trainY = create_dataset(df,Y_train, look_back=look_back)

#Split data into train & test
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)

#Shape of train and test data
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)
print("train size: {}".format(trainX.shape))
print("train Label size: {}".format(trainY.shape))
print("test size: {}".format(testX.shape))
print("test Label size: {}".format(testY.shape))
#train size: (23, 10, 1440)
#train Label size: (23, 960)
#test size: (6, 10, 1440)
#test Label size: (6, 960)


model_RNN = Sequential()
model_RNN.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
model_RNN.add(Dense(960))
model_RNN.add(BatchNormalization())
model_RNN.add(Activation('tanh'))
# Compile model
model_RNN.compile(loss='mean_squared_error', optimizer='adam')
callbacks = [
EarlyStopping(patience=10, verbose=1),
ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1)]
# Fit the model
hist_RNN=model_RNN.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)


#predict

Y_train=np.array(trainY)
Y_test=np.array(testX)

Y_RNN_Train_pred=model_RNN.predict(trainX)
Y_RNN_Test_pred=model_RNN.predict(testX)

train_MSE=mean_squared_error(trainY, Y_RNN_Train_pred)
test_MSE=mean_squared_error(testY, Y_RNN_Test_pred)

# create and fit the Simple LSTM model as 2nd model for multi-tasking

#model_LSTM = Sequential()
#model_LSTM.add(LSTM(units = 1440, input_shape=(trainX.shape[1], trainX.shape[2])))
#model_LSTM.add(Dense(units = 960))
#model_LSTM.add(BatchNormalization())
#model_LSTM.add(Activation('tanh'))
#model_LSTM.compile(loss='mean_squared_error', optimizer='adam')
#hist_LSTM=model_LSTM.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)

#Y_train=np.array(trainY)
#Y_test=np.array(testX)

#Y_LSTM_Train_pred=model_LSTM.predict(trainX)
#Y_LSTM_Test_pred=model_LSTM.predict(testX)

#train_MSE=mean_squared_error(trainY, Y_LSTM_Train_pred)
#test_MSE=mean_squared_error(testY, Y_LSTM_Test_pred)

#plot losses for RNN + LSTM
f, ax = plt.subplots(figsize=(20, 15))
plt.subplot(1, 2, 1)
ax=plt.plot(hist_RNN.history['loss'] ,label='Train loss')
ax=plt.plot(hist_RNN.history['val_loss'],label='Test/Validation/Prediction loss')
plt.xlabel('Training steps (Epochs = 50)')
plt.ylabel('Loss (MSE) for Sx-Sy & Sxy')
plt.title(' RNN Loss on Train and Test data')
plt.legend()
plt.subplot(1, 2, 2)
ax=plt.plot(hist_LSTM.history['loss'] ,label='Train loss')
ax=plt.plot(hist_LSTM.history['val_loss'],label='Test/Validation/Prediction loss')
plt.xlabel('Training steps (Epochs = 50)')
plt.ylabel('Loss (MSE) for Sx-Sy & Sxy')
plt.title('LSTM Loss on Train and Test data')
plt.legend()

plt.subplots_adjust(top=0.80, bottom=0.38, left=0.12, right=0.90, hspace=0.37, wspace=0.28)
#plt.savefig('All_Losses_history_.png')
plt.show()

注意 我没有访问 CUDA 只是访问没有 VGA 的强大服务器。我的目标是利用多处理和多线程来使用 CPU 的最大容量而不是 30% 这意味着只有一个核心而我有四核!任何建议将不胜感激。我上传了一个格式化的 csv数据集。

更新:我的硬件配置如下:

  • CPU:AMD A8-7650K Radeon R7 10 个计算核心 4C+6G 3.30 GHz
  • 内存:16GB
  • 操作系统:Win 7
  • Python 版本 3.6.6
  • Tensorflow 1.8.0 版
  • Keras 2.2.4 版

最佳答案

训练一个模型不会使用 100% 的 CPU 是件好事!现在我们有空间并行训练多个模型并加快整体训练时间。

注意:如果您只想加速此模型,请查看 GPU 或更改超参数,例如批量大小和神经元数量(层大小)。

下面介绍了如何使用 multiprocessing 同时训练多个模型(使用在机器的每个独立 CPU 内核上并行运行的进程)。

multiprocessing.Pool 基本上创建了一个需要执行的作业池。这些进程将选择这些作业并运行它们。当一个作业完成时,进程将从池中选择另一个作业。

import time
import signal
import multiprocessing

def init_worker():
''' Add KeyboardInterrupt exception to mutliprocessing workers '''
signal.signal(signal.SIGINT, signal.SIG_IGN)


def train_model(layer_size):
'''
This code is parallelised and runs on each process
It trains a model with different layer sizes (hyperparameters)
It saves the model and returns the score (error)
'''
import keras
from keras.models import Sequential
from keras.layers import Dense

print(f'Training a model with layer size {layer_size}')

# build your model here
model_RNN = Sequential()
model_RNN.add(Dense(layer_size))

# fit the model (the bit that takes time!)
model_RNN.fit(...)

# lets demonstrate with a sleep timer
time.sleep(5)

# save trained model to a file
model_RNN.save(...)

# you can also return values eg. the eval score
return model_RNN.evaluate(...)


num_workers = 4
hyperparams = [800, 960, 1100]

pool = multiprocessing.Pool(num_workers, init_worker)

scores = pool.map(train_model, hyperparams)

print(scores)

输出:

Training a model with layer size 800
Training a model with layer size 960
Training a model with layer size 1100
[{'size':960,'score':1.0}, {'size':800,'score':1.2}, {'size':1100,'score':0.7}]

这很容易通过代码中的 time.sleep 进行演示。您会看到所有 3 个进程都开始训练工作,然后它们几乎同时完成。如果这是单个处理,则您必须等待每个处理完成才能开始下一个(哈欠!)。

编辑OP 还想要完整的代码。这在 Stack Overflow 上很难,因为我无法在您的环境和您的代码中进行测试。我冒昧地将您的代码复制并粘贴到我上面的模板中。您可能需要添加一些导入,但这已接近“可运行”和“完整”代码。

import time
import signal
import numpy as np
import pandas as pd
import multiprocessing
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score


def init_worker():
''' Add KeyboardInterrupt exception to mutliprocessing workers '''
signal.signal(signal.SIGINT, signal.SIG_IGN)


def train_model(model_type):
'''
This code is parallelised and runs on each process
It trains a model with different layer sizes (hyperparameters)
It saves the model and returns the score (error)
'''
from keras.layers import LSTM, SimpleRNN, Dense, Activation
from keras.models import Sequential
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
from keras.layers.normalization import BatchNormalization

print(f'Training a model: {model_type}')

callbacks = [
EarlyStopping(patience=10, verbose=1),
ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
]

model = Sequential()

if model_type == 'rnn':
model.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
elif model_type == 'lstm':
model.add(LSTM(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))

model.add(Dense(480))
model.add(BatchNormalization())
model.add(Activation('tanh'))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(
trainX,
trainY,
epochs=50,
batch_size=20,
validation_data=(testX, testY),
verbose=1,
callbacks=callbacks,
)

# predict
Y_Train_pred = model.predict(trainX)
Y_Test_pred = model.predict(testX)

train_MSE = mean_squared_error(trainY, Y_Train_pred)
test_MSE = mean_squared_error(testY, Y_Test_pred)

# you can also return values eg. the eval score
return {'type': model_type, 'train_MSE': train_MSE, 'test_MSE': test_MSE}


# Your code
# ---------

df = pd.read_csv("D:\Train.csv", header=None)

index = [i for i in list(range(1440)) if i % 3 == 2]

Y_train = df[index]
df = df.values

# making history by using look-back to prediction next
def create_dataset(dataset, data_train, look_back=1):
dataX, dataY = [], []
print("Len:", len(dataset) - look_back - 1)
for i in range(len(dataset) - look_back - 1):
a = dataset[i : (i + look_back), :]
dataX.append(a)
dataY.append(data_train[i + look_back, :])
return np.array(dataX), np.array(dataY)


Y_train = np.array(Y_train)
df = np.array(df)

look_back = 10
trainX, trainY = create_dataset(df, Y_train, look_back=look_back)

# Split data into train & test
trainX, testX, trainY, testY = train_test_split(
trainX, trainY, test_size=0.2, shuffle=False
)

# My Code
# -------

num_workers = 2
model_types = ['rnn', 'lstm']

pool = multiprocessing.Pool(num_workers, init_worker)

scores = pool.map(train_model, model_types)

print(scores)

程序输出:

[{'type': 'rnn', 'train_MSE': 0.06648435491248038, 'test_MSE': 0.062323388902691866}, 
{'type': 'lstm', 'train_MSE': 0.10114341514420684, 'test_MSE': 0.09998065769499974}]

关于python - 如何在使用 Keras 的深度学习中利用多处理和多线程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56344611/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com