gpt4 book ai didi

python - 如何使用 PyTorch 并行化 CNN 对图像的评估

转载 作者:行者123 更新时间:2023-12-02 16:36:21 25 4
gpt4 key购买 nike

我有一个相当长的脚本,它使用 CNN 将无人机镜头中的人分类为人类或非人类。一般流程如下: (1) 创建一个视频对象并根据指定的秒间隔从中提取“捕获”。 (2) 实例化 Model 类,加载一个 PyTorch CNN。 (3) 对于每次捕获,分解成更小(和重叠)的图像,CNN 将这些图像分类为人类或非人类。 (4) 创建一个坐标列表,满足上一步。 (5) 在坐标周围画出红色方 block ,并保存标记好的图像。 (6) 对每次捕获重复该过程。

图像级别存在瓶颈。 CNN 依次生产和评估裁剪。我很想并行化这个过程,但它超出了我目前的知识/经验水平。

有什么建议吗?脚本如下供引用。

import torch, torchvision
from torchvision import datasets, models, transforms
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import time
from torchsummary import summary

import numpy as np
import matplotlib.pyplot as plt
import os

from PIL import Image
import shutil
from PIL import Image, ImageDraw
import random
import cv2

class Model():

def __init__(self,model):

self.idx_to_class = {1:'No human',0:'Human'}
self.image_transforms = {
'test': transforms.Compose([
transforms.Resize(size=256),
transforms.CenterCrop(size=224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406],
[0.229, 0.224, 0.225])
])
}
self.model = torch.load(model)

def predict(self, test_image_name):
transform = self.image_transforms['test']
# test_image = Image.open(test_image_name)
test_image_tensor = transform(test_image_name)

if torch.cuda.is_available():
test_image_tensor = test_image_tensor.view(1, 3, 224, 224).cuda()
else:
test_image_tensor = test_image_tensor.view(1, 3, 224, 224)

with torch.no_grad():
self.model.eval()
out = self.model(test_image_tensor)
ps = torch.exp(out)
topk, topclass = ps.topk(1, dim=1)
return topclass.cpu().numpy()[0][0]


class Image_classifier():

def __init__(self,image,folder,positive_location,model):
self.name = image
self.alias = image.split('.')[0]
self.folder = folder
self.src = Image.open(f"{self.folder}/{self.name}")
self.width = self.src.size[0]
self.square_size = int(self.width/25)
self.max_down = int(self.src.height/self.square_size) * self.square_size - self.square_size
self.max_right = int(self.src.width/self.square_size) * self.square_size - self.square_size
self.offset = int(self.square_size/3)
self.positive_location = positive_location
self.model = model


def window_coordinates(self):

def right_pass(y):
x_coords = [x for x in range(0,self.max_right,self.offset)]
y_coords = [y for x in range(0,self.max_right,self.offset)]
return [(x,y,x+self.square_size,y+self.square_size) for x,y in zip(x_coords,y_coords)]

#v_pass = np.vectorize(right_pass)

y_values = [y for y in range(0,self.max_down,self.offset)]
coordinates = [right_pass(y) for y in y_values]
self.coordinates = [item for sublist in coordinates for item in sublist]

def predict_coord(self,coord):
sample = self.src.crop(coord)
return self.model.predict(sample)


def parse_coordinates(self):

new_coords = [coord if self.predict_coord(coord) == 0 else 0 for coord in self.coordinates]
while 0 in new_coords:
new_coords.remove(0)
self.coordinates = new_coords

def select_squares(self):
self.window_coordinates()
self.parse_coordinates()
self.drawable = ImageDraw.Draw(self.src)
for coord in self.coordinates:
self.drawable.rectangle(list(coord), fill = None, outline = 'red')
self.src.save(f"{self.positive_location}/{self.alias}.jpg")


class Video_classifier():
def __init__(self,video,root,seconds):
self.video = video
self.alias = self.video.split('.')[0]
self.root = root
self.seconds = seconds
self.model = Model(model='/home/team4/output/_model_142.pt')

self.folder = f"{self.root}/{self.alias}"
if os.path.exists(self.folder):
shutil.rmtree(self.folder)
os.mkdir(self.folder)

self.positive_location = f"{self.root}/{self.alias}/positives"
if os.path.exists(self.positive_location):
shutil.rmtree(self.positive_location)
os.mkdir(self.positive_location)

def get_frames(self):
import cv2
cam = cv2.VideoCapture(f"{self.root}/{self.video}")

(major_ver, minor_ver, subminor_ver) = (cv2.__version__).split('.')
if int(major_ver) < 3 :
fps = round(video.get(cv2.cv.CV_CAP_PROP_FPS))
else:
fps = round(cam.get(cv2.CAP_PROP_FPS))

current_frame = 0
while(True):
ret,frame = cam.read()
if ret:
if current_frame % (self.seconds*fps) == 0:
f_name = f"{self.alias}_{current_frame}.jpg"
cv2.imwrite(f_name, frame)
shutil.move(f_name, self.folder)

current_frame += 1
else:
break

cam.release()
cv2.destroyAllWindows()

def read_dir(self):
self.files = [f for f in os.listdir(self.folder) if os.path.isfile(os.path.join(self.folder, f))]

def classify_frames(self):
self.get_frames()
self.read_dir()
for file in self.files:
image = Image_classifier(image=file, folder=self.folder, positive_location=self.positive_location,
model=self.model)
image.select_squares()

test = Video_classifier(video='refugee_test.mp4',root='/home/team4/Untitled Folder 1', seconds=10)
test.classify_frames()


对于缺少评论、文档字符串等,我深表歉意。这是在进行中。

最佳答案

所以你会想学习如何使用库多处理。毫无疑问,有很多方法可以解决这个问题,因为该库非常广泛。假设您需要跟踪不同的过程以正确地将图像重新组合在一起,我将采取以下方法。这比仅仅创建一个进程池更复杂,但允许您跟踪所有进程。

from multiprocessing import Process, Pipe

#keeps track of connections
conList = []
for i in range(numberOfProcessesNeeded):
#creates connection
recv, send = Pipe()

#gets process ready with function to complete and arguments
pid = Process(target=imageProcessingFunction, args=(i,command,send))

#starts process
pid.start()

#keeps track of process
conList.append([i,recv])

现在,该函数的参数之一必须是从管道创建中“发送”,以便在进程结束时它可以发送回信息。

在要使用“send.send(DataToReturn)”而不是“return(DataToReturn)”的图像处理函数中。

在收集数据的主要功能中执行此操作。
data = []
for i in conList:
#this will wait until each process has finished and then collect the data
out = i[1].recv()
data.append(out)

在此之后,数组数据将按照您调用它们的顺序处理所有图像。

希望这不会令人困惑,但这是一个棘手的主题。更多信息可以在这里找到:
https://docs.python.org/3.4/library/multiprocessing.html

关于python - 如何使用 PyTorch 并行化 CNN 对图像的评估,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58959799/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com