gpt4 book ai didi

python - DCGAN 调试。得到只是垃圾

转载 作者:行者123 更新时间:2023-12-03 10:08:08 27 4
gpt4 key购买 nike

简介:
我正在尝试让 CDCGAN(条件深度卷积生成对抗网络)处理 MNIST 数据集,考虑到我使用的库(PyTorch)在其网站上有教程,这应该相当容易。
但我似乎无法让它工作,它只会产生垃圾或模型崩溃或两者兼而有之。
我试过的:

  • 制作模型条件半监督学习
  • 使用批量规范
  • 除了生成器和鉴别器上的输入/输出层之外,在每一层上使用 dropout
  • 标签平滑以打击过度自信
  • 向图像添加噪声(我猜你称之为实例噪声)以获得更好的数据分布
  • 使用泄漏的 relu 来避免梯度消失
  • 使用重放缓冲区来防止忘记学习的东西和过度拟合
  • 玩超参数
  • 将其与 PyTorch 教程中的模型进行比较
  • basically what I did besides some things like Embedding layer ect.

  • 我的模型生成的图像:
    超参数:
    batch_size=50, learning_rate_discrimiantor=0.0001, learning_rate_generator=0.0003, shuffle=True, ndf=64, ngf=64, dropout=0.5
    enter image description here
    enter image description here
    enter image description here
    enter image description here
    batch_size=50, learning_rate_discriminator=0.0003, learning_rate_generator=0.0003, shuffle=True, ndf=64, ngf=64, dropout=0
    enter image description here
    enter image description here
    enter image description here
    enter image description here
    图片 Pytorch tutorial Model生成:
    Code for the pytorch tutorial dcgan model
    作为比较,这里是来自 pytorch turoial 的 DCGAN 图像:
    enter image description here
    enter image description here
    enter image description here
    我的代码:
    import torch
    import torch.nn as nn
    import torchvision
    from torchvision import transforms, datasets
    import torch.nn.functional as F
    from torch import optim as optim
    from torch.utils.tensorboard import SummaryWriter

    import numpy as np

    import os
    import time


    class Discriminator(torch.nn.Module):
    def __init__(self, ndf=16, dropout_value=0.5): # ndf feature map discriminator
    super().__init__()
    self.ndf = ndf
    self.droupout_value = dropout_value

    self.condi = nn.Sequential(
    nn.Linear(in_features=10, out_features=64 * 64)
    )

    self.hidden0 = nn.Sequential(
    nn.Conv2d(in_channels=2, out_channels=self.ndf, kernel_size=4, stride=2, padding=1, bias=False),
    nn.LeakyReLU(0.2),
    )
    self.hidden1 = nn.Sequential(
    nn.Conv2d(in_channels=self.ndf, out_channels=self.ndf * 2, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(self.ndf * 2),
    nn.LeakyReLU(0.2),
    nn.Dropout(self.droupout_value)
    )
    self.hidden2 = nn.Sequential(
    nn.Conv2d(in_channels=self.ndf * 2, out_channels=self.ndf * 4, kernel_size=4, stride=2, padding=1, bias=False),
    #nn.BatchNorm2d(self.ndf * 4),
    nn.LeakyReLU(0.2),
    nn.Dropout(self.droupout_value)
    )
    self.hidden3 = nn.Sequential(
    nn.Conv2d(in_channels=self.ndf * 4, out_channels=self.ndf * 8, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(self.ndf * 8),
    nn.LeakyReLU(0.2),
    nn.Dropout(self.droupout_value)
    )
    self.out = nn.Sequential(
    nn.Conv2d(in_channels=self.ndf * 8, out_channels=1, kernel_size=4, stride=1, padding=0, bias=False),
    torch.nn.Sigmoid()
    )

    def forward(self, x, y):
    y = self.condi(y.view(-1, 10))
    y = y.view(-1, 1, 64, 64)

    x = torch.cat((x, y), dim=1)

    x = self.hidden0(x)
    x = self.hidden1(x)
    x = self.hidden2(x)
    x = self.hidden3(x)
    x = self.out(x)

    return x


    class Generator(torch.nn.Module):
    def __init__(self, n_features=100, ngf=16, c_channels=1, dropout_value=0.5): # ngf feature map of generator
    super().__init__()
    self.ngf = ngf
    self.n_features = n_features
    self.c_channels = c_channels
    self.droupout_value = dropout_value

    self.hidden0 = nn.Sequential(
    nn.ConvTranspose2d(in_channels=self.n_features + 10, out_channels=self.ngf * 8,
    kernel_size=4, stride=1, padding=0, bias=False),
    nn.BatchNorm2d(self.ngf * 8),
    nn.LeakyReLU(0.2)
    )

    self.hidden1 = nn.Sequential(
    nn.ConvTranspose2d(in_channels=self.ngf * 8, out_channels=self.ngf * 4,
    kernel_size=4, stride=2, padding=1, bias=False),
    #nn.BatchNorm2d(self.ngf * 4),
    nn.LeakyReLU(0.2),
    nn.Dropout(self.droupout_value)
    )

    self.hidden2 = nn.Sequential(
    nn.ConvTranspose2d(in_channels=self.ngf * 4, out_channels=self.ngf * 2,
    kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(self.ngf * 2),
    nn.LeakyReLU(0.2),
    nn.Dropout(self.droupout_value)
    )

    self.hidden3 = nn.Sequential(
    nn.ConvTranspose2d(in_channels=self.ngf * 2, out_channels=self.ngf,
    kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(self.ngf),
    nn.LeakyReLU(0.2),
    nn.Dropout(self.droupout_value)
    )

    self.out = nn.Sequential(
    # "out_channels=1" because gray scale
    nn.ConvTranspose2d(in_channels=self.ngf, out_channels=1, kernel_size=4,
    stride=2, padding=1, bias=False),
    nn.Tanh()
    )

    def forward(self, x, y):
    x_cond = torch.cat((x, y), dim=1) # Combine flatten image with conditional input (class labels)

    x = self.hidden0(x_cond) # Image goes into a "ConvTranspose2d" layer
    x = self.hidden1(x)
    x = self.hidden2(x)
    x = self.hidden3(x)
    x = self.out(x)

    return x


    class Logger:
    def __init__(self, model_name, model1, model2, m1_optimizer, m2_optimizer, model_parameter, train_loader):
    self.out_dir = "data"
    self.model_name = model_name
    self.train_loader = train_loader
    self.model1 = model1
    self.model2 = model2
    self.model_parameter = model_parameter
    self.m1_optimizer = m1_optimizer
    self.m2_optimizer = m2_optimizer

    # Exclude Epochs of the model name. This make sense e.g. when we stop a training progress and continue later on.
    self.experiment_name = '_'.join("{!s}={!r}".format(k, v) for (k, v) in model_parameter.items())\
    .replace("Epochs" + "=" + str(model_parameter["Epochs"]), "")

    self.d_error = 0
    self.g_error = 0

    self.tb = SummaryWriter(log_dir=str(self.out_dir + "/log/" + self.model_name + "/runs/" + self.experiment_name))

    self.path_image = os.path.join(os.getcwd(), f'{self.out_dir}/log/{self.model_name}/images/{self.experiment_name}')
    self.path_model = os.path.join(os.getcwd(), f'{self.out_dir}/log/{self.model_name}/model/{self.experiment_name}')

    try:
    os.makedirs(self.path_image)
    except Exception as e:
    print("WARNING: ", str(e))

    try:
    os.makedirs(self.path_model)
    except Exception as e:
    print("WARNING: ", str(e))

    def log_graph(self, model1_input, model2_input, model1_label, model2_label):
    self.tb.add_graph(self.model1, input_to_model=(model1_input, model1_label))
    self.tb.add_graph(self.model2, input_to_model=(model2_input, model2_label))

    def log(self, num_epoch, d_error, g_error):
    self.d_error = d_error
    self.g_error = g_error

    self.tb.add_scalar("Discriminator Train Error", self.d_error, num_epoch)
    self.tb.add_scalar("Generator Train Error", self.g_error, num_epoch)

    def log_image(self, images, epoch, batch_num):
    grid = torchvision.utils.make_grid(images)
    torchvision.utils.save_image(grid, f'{self.path_image}\\Epoch_{epoch}_batch_{batch_num}.png')

    self.tb.add_image("Generator Image", grid)

    def log_histogramm(self):
    for name, param in self.model2.named_parameters():
    self.tb.add_histogram(name, param, self.model_parameter["Epochs"])
    self.tb.add_histogram(f'gen_{name}.grad', param.grad, self.model_parameter["Epochs"])

    for name, param in self.model1.named_parameters():
    self.tb.add_histogram(name, param, self.model_parameter["Epochs"])
    self.tb.add_histogram(f'dis_{name}.grad', param.grad, self.model_parameter["Epochs"])

    def log_model(self, num_epoch):
    torch.save({
    "epoch": num_epoch,
    "model_generator_state_dict": self.model1.state_dict(),
    "model_discriminator_state_dict": self.model2.state_dict(),
    "optimizer_generator_state_dict": self.m1_optimizer.state_dict(),
    "optimizer_discriminator_state_dict": self.m2_optimizer.state_dict(),
    }, str(self.path_model + f'\\{time.time()}_epoch{num_epoch}.pth'))

    def close(self, logger, images, num_epoch, d_error, g_error):
    logger.log_model(num_epoch)
    logger.log_histogramm()
    logger.log(num_epoch, d_error, g_error)
    self.tb.close()

    def display_stats(self, epoch, batch_num, dis_error, gen_error):
    print(f'Epoch: [{epoch}/{self.model_parameter["Epochs"]}] '
    f'Batch: [{batch_num}/{len(self.train_loader)}] '
    f'Loss_D: {dis_error.data.cpu()}, '
    f'Loss_G: {gen_error.data.cpu()}')


    def get_MNIST_dataset(num_workers_loader, model_parameter, out_dir="data"):
    compose = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.CenterCrop((64, 64)),
    transforms.ToTensor(),
    torchvision.transforms.Normalize(mean=[0.5], std=[0.5])
    ])

    dataset = datasets.MNIST(
    root=out_dir,
    train=True,
    download=True,
    transform=compose
    )

    train_loader = torch.utils.data.DataLoader(dataset,
    batch_size=model_parameter["batch_size"],
    num_workers=num_workers_loader,
    shuffle=model_parameter["shuffle"])

    return dataset, train_loader


    def train_discriminator(p_optimizer, p_noise, p_images, p_fake_target, p_real_target, p_images_labels, p_fake_labels, device):
    p_optimizer.zero_grad()

    # 1.1 Train on real data
    pred_dis_real = discriminator(p_images, p_images_labels)
    error_real = loss(pred_dis_real, p_real_target)

    error_real.backward()

    # 1.2 Train on fake data
    fake_data = generator(p_noise, p_fake_labels).detach()
    fake_data = add_noise_to_image(fake_data, device)
    pred_dis_fake = discriminator(fake_data, p_fake_labels)
    error_fake = loss(pred_dis_fake, p_fake_target)

    error_fake.backward()

    p_optimizer.step()

    return error_fake + error_real


    def train_generator(p_optimizer, p_noise, p_real_target, p_fake_labels, device):
    p_optimizer.zero_grad()

    fake_images = generator(p_noise, p_fake_labels)
    fake_images = add_noise_to_image(fake_images, device)
    pred_dis_fake = discriminator(fake_images, p_fake_labels)
    error_fake = loss(pred_dis_fake, p_real_target) # because
    """
    We use "p_real_target" instead of "p_fake_target" because we want to
    maximize that the discriminator is wrong.
    """

    error_fake.backward()

    p_optimizer.step()

    return fake_images, pred_dis_fake, error_fake


    # TODO change to a Truncated normal distribution
    def get_noise(batch_size, n_features=100):
    return torch.FloatTensor(batch_size, n_features, 1, 1).uniform_(-1, 1)


    # We flip label of real and fate data. Better gradient flow I have told
    def get_real_data_target(batch_size):
    return torch.FloatTensor(batch_size, 1, 1, 1).uniform_(0.0, 0.2)


    def get_fake_data_target(batch_size):
    return torch.FloatTensor(batch_size, 1, 1, 1).uniform_(0.8, 1.1)


    def image_to_vector(images):
    return torch.flatten(images, start_dim=1, end_dim=-1)


    def vector_to_image(images):
    return images.view(images.size(0), 1, 28, 28)


    def get_rand_labels(batch_size):
    return torch.randint(low=0, high=9, size=(batch_size,))


    def load_model(model_load_path):
    if model_load_path:
    checkpoint = torch.load(model_load_path)

    discriminator.load_state_dict(checkpoint["model_discriminator_state_dict"])
    generator.load_state_dict(checkpoint["model_generator_state_dict"])

    dis_opti.load_state_dict(checkpoint["optimizer_discriminator_state_dict"])
    gen_opti.load_state_dict(checkpoint["optimizer_generator_state_dict"])

    return checkpoint["epoch"]

    else:
    return 0


    def init_model_optimizer(model_parameter, device):
    # Initialize the Models
    discriminator = Discriminator(ndf=model_parameter["ndf"], dropout_value=model_parameter["dropout"]).to(device)
    generator = Generator(ngf=model_parameter["ngf"], dropout_value=model_parameter["dropout"]).to(device)

    # train
    dis_opti = optim.Adam(discriminator.parameters(), lr=model_parameter["learning_rate_dis"], betas=(0.5, 0.999))
    gen_opti = optim.Adam(generator.parameters(), lr=model_parameter["learning_rate_gen"], betas=(0.5, 0.999))

    return discriminator, generator, dis_opti, gen_opti


    def get_hot_vector_encode(labels, device):
    return torch.eye(10)[labels].view(-1, 10, 1, 1).to(device)


    def add_noise_to_image(images, device, level_of_noise=0.1):
    return images[0].to(device) + (level_of_noise) * torch.randn(images.shape).to(device)


    if __name__ == "__main__":
    # Hyperparameter
    model_parameter = {
    "batch_size": 500,
    "learning_rate_dis": 0.0002,
    "learning_rate_gen": 0.0002,
    "shuffle": False,
    "Epochs": 10,
    "ndf": 64,
    "ngf": 64,
    "dropout": 0.5
    }

    # Parameter
    r_frequent = 10 # How many samples we save for replay per batch (batch_size / r_frequent).
    model_name = "CDCGAN" # The name of you model e.g. "Gan"
    num_workers_loader = 1 # How many workers should load the data
    sample_save_size = 16 # How many numbers your saved imaged should show
    device = "cuda" # Which device should be used to train the neural network
    model_load_path = "" # If set load model instead of training from new
    num_epoch_log = 1 # How frequent you want to log/
    torch.manual_seed(43) # Sets a seed for torch for reproducibility

    dataset_train, train_loader = get_MNIST_dataset(num_workers_loader, model_parameter) # Get dataset

    # Initialize the Models and optimizer
    discriminator, generator, dis_opti, gen_opti = init_model_optimizer(model_parameter, device) # Init model/Optimizer

    start_epoch = load_model(model_load_path) # when we want to load a model

    # Init Logger
    logger = Logger(model_name, generator, discriminator, gen_opti, dis_opti, model_parameter, train_loader)

    loss = nn.BCELoss()

    images, labels = next(iter(train_loader)) # For logging

    # For testing
    # pred = generator(get_noise(model_parameter["batch_size"]).to(device), get_hot_vector_encode(get_rand_labels(model_parameter["batch_size"]), device))
    # dis = discriminator(images.to(device), get_hot_vector_encode(labels, device))

    logger.log_graph(get_noise(model_parameter["batch_size"]).to(device), images.to(device),
    get_hot_vector_encode(get_rand_labels(model_parameter["batch_size"]), device),
    get_hot_vector_encode(labels, device))


    # Array to store
    exp_replay = torch.tensor([]).to(device)

    for num_epoch in range(start_epoch, model_parameter["Epochs"]):
    for batch_num, data_loader in enumerate(train_loader):
    images, labels = data_loader
    images = add_noise_to_image(images, device) # Add noise to the images

    # 1. Train Discriminator
    dis_error = train_discriminator(
    dis_opti,
    get_noise(model_parameter["batch_size"]).to(device),
    images.to(device),
    get_fake_data_target(model_parameter["batch_size"]).to(device),
    get_real_data_target(model_parameter["batch_size"]).to(device),
    get_hot_vector_encode(labels, device),
    get_hot_vector_encode(
    get_rand_labels(model_parameter["batch_size"]), device),
    device
    )

    # 2. Train Generator
    fake_image, pred_dis_fake, gen_error = train_generator(
    gen_opti,
    get_noise(model_parameter["batch_size"]).to(device),
    get_real_data_target(model_parameter["batch_size"]).to(device),
    get_hot_vector_encode(
    get_rand_labels(model_parameter["batch_size"]),
    device),
    device
    )


    # Store a random point for experience replay
    perm = torch.randperm(fake_image.size(0))
    r_idx = perm[:max(1, int(model_parameter["batch_size"] / r_frequent))]
    r_samples = add_noise_to_image(fake_image[r_idx], device)
    exp_replay = torch.cat((exp_replay, r_samples), 0).detach()

    if exp_replay.size(0) >= model_parameter["batch_size"]:
    # Train on experienced data
    dis_opti.zero_grad()

    r_label = get_hot_vector_encode(torch.zeros(exp_replay.size(0)).numpy(), device)
    pred_dis_real = discriminator(exp_replay, r_label)
    error_real = loss(pred_dis_real, get_fake_data_target(exp_replay.size(0)).to(device))

    error_real.backward()

    dis_opti.step()

    print(f'Epoch: [{num_epoch}/{model_parameter["Epochs"]}] '
    f'Batch: Replay/Experience batch '
    f'Loss_D: {error_real.data.cpu()}, '
    )

    exp_replay = torch.tensor([]).to(device)

    logger.display_stats(epoch=num_epoch, batch_num=batch_num, dis_error=dis_error, gen_error=gen_error)

    if batch_num % 100 == 0:
    logger.log_image(fake_image[:sample_save_size], num_epoch, batch_num)

    logger.log(num_epoch, dis_error, gen_error)
    if num_epoch % num_epoch_log == 0:
    logger.log_model(num_epoch)
    logger.log_histogramm()
    logger.close(logger, fake_image[:sample_save_size], num_epoch, dis_error, gen_error)
    First link to my Code (Pastebin)
    Second link to my Code (0bin)
    结论:
    因为我实现了所有这些被认为对 GAN/DCGAN 有益的东西(例如标签平滑)。
    而且我的模型的性能仍然比 PyTorch 的教程 DCGAN 差,我想我的代码中可能有一个错误,但我似乎找不到它。
    再现性:
    如果你安装了我导入的库,你应该能够复制代码并运行它,如果你能找到任何东西,你可以自己查找。
    我感谢任何反馈。

    最佳答案

    所以我前段时间解决了这个问题,但忘记在堆栈溢出上发布答案。所以我将简单地在这里发布我的代码,它应该可以很好地工作。
    一些免责声明:

  • 我不太确定它是否有效,因为我一年前这样做了
  • 它用于 128x128px 图像 MNIST
  • 这不是普通的 GAN 我使用了各种优化技术
  • 如果要使用它需要更改各种细节,例如训练数据集

  • 资源:
  • Multi-Scale Gradients
  • Instance Noise
  • Various tricks I used
  • More tricks

  • ``
    import torch
    from torch.autograd import Variable
    import torch.nn as nn
    import torch.nn.functional as F
    import torchvision
    import torchvision.transforms as transforms
    from torch.utils.data import DataLoader

    import pytorch_lightning as pl
    from pytorch_lightning import loggers

    from numpy.random import choice

    import os
    from pathlib import Path
    import shutil

    from collections import OrderedDict

    # custom weights initialization called on netG and netD
    def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
    nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
    nn.init.normal_(m.weight.data, 1.0, 0.02)
    nn.init.constant_(m.bias.data, 0)

    # randomly flip some labels
    def noisy_labels(y, p_flip=0.05): # # flip labels with 5% probability
    # determine the number of labels to flip
    n_select = int(p_flip * y.shape[0])
    # choose labels to flip
    flip_ix = choice([i for i in range(y.shape[0])], size=n_select)
    # invert the labels in place
    y[flip_ix] = 1 - y[flip_ix]
    return y

    class AddGaussianNoise(object):
    def __init__(self, mean=0.0, std=0.1):
    self.std = std
    self.mean = mean

    def __call__(self, tensor):
    tensor = tensor.cuda()
    return tensor + (torch.randn(tensor.size()) * self.std + self.mean).cuda()

    def __repr__(self):
    return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)

    def resize2d(img, size):
    return (F.adaptive_avg_pool2d(img, size).data).cuda()

    def get_valid_labels(img):
    return ((0.8 - 1.1) * torch.rand(img.shape[0], 1, 1, 1) + 1.1).cuda() # soft labels

    def get_unvalid_labels(img):
    return (noisy_labels((0.0 - 0.3) * torch.rand(img.shape[0], 1, 1, 1) + 0.3)).cuda() # soft labels

    class Generator(pl.LightningModule):
    def __init__(self, ngf, nc, latent_dim):
    super(Generator, self).__init__()
    self.ngf = ngf
    self.latent_dim = latent_dim
    self.nc = nc

    self.fc0 = nn.Sequential(
    # input is Z, going into a convolution
    nn.utils.spectral_norm(nn.ConvTranspose2d(latent_dim, ngf * 16, 4, 1, 0, bias=False)),
    nn.LeakyReLU(0.2, inplace=True),
    nn.BatchNorm2d(ngf * 16)
    )

    self.fc1 = nn.Sequential(
    # state size. (ngf*8) x 4 x 4
    nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 16, ngf * 8, 4, 2, 1, bias=False)),
    nn.LeakyReLU(0.2, inplace=True),
    nn.BatchNorm2d(ngf * 8)
    )

    self.fc2 = nn.Sequential(
    # state size. (ngf*4) x 8 x 8
    nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False)),
    nn.LeakyReLU(0.2, inplace=True),
    nn.BatchNorm2d(ngf * 4)
    )

    self.fc3 = nn.Sequential(
    # state size. (ngf*2) x 16 x 16
    nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 4, ngf * 2, 4, 2, 1, bias=False)),
    nn.LeakyReLU(0.2, inplace=True),
    nn.BatchNorm2d(ngf * 2)
    )

    self.fc4 = nn.Sequential(
    # state size. (ngf) x 32 x 32
    nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 2, ngf, 4, 2, 1, bias=False)),
    nn.LeakyReLU(0.2, inplace=True),
    nn.BatchNorm2d(ngf)
    )

    self.fc5 = nn.Sequential(
    # state size. (nc) x 64 x 64
    nn.utils.spectral_norm(nn.ConvTranspose2d(ngf, nc, 4, 2, 1, bias=False)),
    nn.Tanh()
    )

    # state size. (nc) x 128 x 128

    # For Multi-Scale Gradient
    # Converting the intermediate layers into images
    self.fc0_r = nn.Conv2d(ngf * 16, self.nc, 1)
    self.fc1_r = nn.Conv2d(ngf * 8, self.nc, 1)
    self.fc2_r = nn.Conv2d(ngf * 4, self.nc, 1)
    self.fc3_r = nn.Conv2d(ngf * 2, self.nc, 1)
    self.fc4_r = nn.Conv2d(ngf, self.nc, 1)

    def forward(self, input):
    x_0 = self.fc0(input)
    x_1 = self.fc1(x_0)
    x_2 = self.fc2(x_1)
    x_3 = self.fc3(x_2)
    x_4 = self.fc4(x_3)
    x_5 = self.fc5(x_4)

    # For Multi-Scale Gradient
    # Converting the intermediate layers into images
    x_0_r = self.fc0_r(x_0)
    x_1_r = self.fc1_r(x_1)
    x_2_r = self.fc2_r(x_2)
    x_3_r = self.fc3_r(x_3)
    x_4_r = self.fc4_r(x_4)

    return x_5, x_0_r, x_1_r, x_2_r, x_3_r, x_4_r

    class Discriminator(pl.LightningModule):
    def __init__(self, ndf, nc):
    super(Discriminator, self).__init__()
    self.nc = nc
    self.ndf = ndf

    self.fc0 = nn.Sequential(
    # input is (nc) x 128 x 128
    nn.utils.spectral_norm(nn.Conv2d(nc, ndf, 4, 2, 1, bias=False)),
    nn.LeakyReLU(0.2, inplace=True)
    )

    self.fc1 = nn.Sequential(
    # state size. (ndf) x 64 x 64
    nn.utils.spectral_norm(nn.Conv2d(ndf + nc, ndf * 2, 4, 2, 1, bias=False)),
    # "+ nc" because of multi scale gradient
    nn.LeakyReLU(0.2, inplace=True),
    nn.BatchNorm2d(ndf * 2)
    )

    self.fc2 = nn.Sequential(
    # state size. (ndf*2) x 32 x 32
    nn.utils.spectral_norm(nn.Conv2d(ndf * 2 + nc, ndf * 4, 4, 2, 1, bias=False)),
    # "+ nc" because of multi scale gradient
    nn.LeakyReLU(0.2, inplace=True),
    nn.BatchNorm2d(ndf * 4)
    )

    self.fc3 = nn.Sequential(
    # state size. (ndf*4) x 16 x 16e
    nn.utils.spectral_norm(nn.Conv2d(ndf * 4 + nc, ndf * 8, 4, 2, 1, bias=False)),
    # "+ nc" because of multi scale gradient
    nn.LeakyReLU(0.2, inplace=True),
    nn.BatchNorm2d(ndf * 8),
    )

    self.fc4 = nn.Sequential(
    # state size. (ndf*8) x 8 x 8
    nn.utils.spectral_norm(nn.Conv2d(ndf * 8 + nc, ndf * 16, 4, 2, 1, bias=False)),
    nn.LeakyReLU(0.2, inplace=True),
    nn.BatchNorm2d(ndf * 16)
    )

    self.fc5 = nn.Sequential(
    # state size. (ndf*8) x 4 x 4
    nn.utils.spectral_norm(nn.Conv2d(ndf * 16 + nc, 1, 4, 1, 0, bias=False)),
    nn.Sigmoid()
    )

    # state size. 1 x 1 x 1

    def forward(self, input, detach_or_not):
    # When we train i ncombination with generator we use multi scale gradient.
    x, x_0_r, x_1_r, x_2_r, x_3_r, x_4_r = input
    if detach_or_not:
    x = x.detach()

    x_0 = self.fc0(x)

    x_0 = torch.cat((x_0, x_4_r), dim=1) # Concat Multi-Scale Gradient
    x_1 = self.fc1(x_0)

    x_1 = torch.cat((x_1, x_3_r), dim=1) # Concat Multi-Scale Gradient
    x_2 = self.fc2(x_1)

    x_2 = torch.cat((x_2, x_2_r), dim=1) # Concat Multi-Scale Gradient
    x_3 = self.fc3(x_2)

    x_3 = torch.cat((x_3, x_1_r), dim=1) # Concat Multi-Scale Gradient
    x_4 = self.fc4(x_3)

    x_4 = torch.cat((x_4, x_0_r), dim=1) # Concat Multi-Scale Gradient
    x_5 = self.fc5(x_4)

    return x_5

    class DCGAN(pl.LightningModule):

    def __init__(self, hparams, checkpoint_folder, experiment_name):
    super().__init__()
    self.hparams = hparams
    self.checkpoint_folder = checkpoint_folder
    self.experiment_name = experiment_name

    # networks
    self.generator = Generator(ngf=hparams.ngf, nc=hparams.nc, latent_dim=hparams.latent_dim)
    self.discriminator = Discriminator(ndf=hparams.ndf, nc=hparams.nc)
    self.generator.apply(weights_init)
    self.discriminator.apply(weights_init)

    # cache for generated images
    self.generated_imgs = None
    self.last_imgs = None

    # For experience replay
    self.exp_replay_dis = torch.tensor([])


    def forward(self, z):
    return self.generator(z)

    def adversarial_loss(self, y_hat, y):
    return F.binary_cross_entropy(y_hat, y)

    def training_step(self, batch, batch_nb, optimizer_idx):
    # For adding Instance noise for more visit: https://www.inference.vc/instance-noise-a-trick-for-stabilising-gan-training/
    std_gaussian = max(0, self.hparams.level_of_noise - (
    (self.hparams.level_of_noise * 2) * (self.current_epoch / self.hparams.epochs)))
    AddGaussianNoiseInst = AddGaussianNoise(std=std_gaussian) # the noise decays over time

    imgs, _ = batch
    imgs = AddGaussianNoiseInst(imgs) # Adding instance noise to real images
    self.last_imgs = imgs

    # train generator
    if optimizer_idx == 0:
    # sample noise
    z = torch.randn(imgs.shape[0], self.hparams.latent_dim, 1, 1).cuda()

    # generate images
    self.generated_imgs = self(z)

    # ground truth result (ie: all fake)
    g_loss = self.adversarial_loss(self.discriminator(self.generated_imgs, False), get_valid_labels(self.generated_imgs[0])) # adversarial loss is binary cross-entropy; [0] is the image of the last layer

    tqdm_dict = {'g_loss': g_loss}
    log = {'g_loss': g_loss, "std_gaussian": std_gaussian}
    output = OrderedDict({
    'loss': g_loss,
    'progress_bar': tqdm_dict,
    'log': log
    })
    return output

    # train discriminator
    if optimizer_idx == 1:
    # Measure discriminator's ability to classify real from generated samples
    # how well can it label as real?
    real_loss = self.adversarial_loss(
    self.discriminator([imgs, resize2d(imgs, 4), resize2d(imgs, 8), resize2d(imgs, 16), resize2d(imgs, 32), resize2d(imgs, 64)],
    False), get_valid_labels(imgs))

    fake_loss = self.adversarial_loss(self.discriminator(self.generated_imgs, True), get_unvalid_labels(
    self.generated_imgs[0])) # how well can it label as fake?; [0] is the image of the last layer

    # discriminator loss is the average of these
    d_loss = (real_loss + fake_loss) / 2

    tqdm_dict = {'d_loss': d_loss}
    log = {'d_loss': d_loss, "std_gaussian": std_gaussian}
    output = OrderedDict({
    'loss': d_loss,
    'progress_bar': tqdm_dict,
    'log': log
    })
    return output

    def configure_optimizers(self):
    lr_gen = self.hparams.lr_gen
    lr_dis = self.hparams.lr_dis
    b1 = self.hparams.b1
    b2 = self.hparams.b2

    opt_g = torch.optim.Adam(self.generator.parameters(), lr=lr_gen, betas=(b1, b2))
    opt_d = torch.optim.Adam(self.discriminator.parameters(), lr=lr_dis, betas=(b1, b2))
    return [opt_g, opt_d], []

    def backward(self, trainer, loss, optimizer, optimizer_idx: int) -> None:
    loss.backward(retain_graph=True)

    def train_dataloader(self):
    # transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)),
    # transforms.ToTensor(),
    # transforms.Normalize([0.5], [0.5])])
    # dataset = torchvision.datasets.MNIST(os.getcwd(), train=False, download=True, transform=transform)
    # return DataLoader(dataset, batch_size=self.hparams.batch_size)
    # transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)),
    # transforms.ToTensor(),
    # transforms.Normalize([0.5], [0.5])
    # ])

    # train_dataset = torchvision.datasets.ImageFolder(
    # root="./drive/My Drive/datasets/flower_dataset/",
    # # root="./drive/My Drive/datasets/ghibli_dataset_small_overfit/",
    # transform=transform
    # )
    # return DataLoader(train_dataset, num_workers=self.hparams.num_workers, shuffle=True,
    # batch_size=self.hparams.batch_size)

    transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])
    ])
    train_dataset = torchvision.datasets.ImageFolder(
    root="ghibli_dataset_small_overfit/",
    transform=transform
    )
    return DataLoader(train_dataset, num_workers=self.hparams.num_workers, shuffle=True,
    batch_size=self.hparams.batch_size)

    def on_epoch_end(self):
    z = torch.randn(4, self.hparams.latent_dim, 1, 1).cuda()
    # match gpu device (or keep as cpu)
    if self.on_gpu:
    z = z.cuda(self.last_imgs.device.index)

    # log sampled images
    sample_imgs = self.generator(z)[0]
    torchvision.utils.save_image(sample_imgs, f'generated_images_epoch{self.current_epoch}.png')

    # save model
    if self.current_epoch % self.hparams.save_model_every_epoch == 0:
    trainer.save_checkpoint(
    self.checkpoint_folder + "/" + self.experiment_name + "_epoch_" + str(self.current_epoch) + ".ckpt")

    from argparse import Namespace

    args = {
    'batch_size': 128, # batch size
    'lr_gen': 0.0003, # TTUR;learnin rate of both networks; tested value: 0.0002
    'lr_dis': 0.0003, # TTUR;learnin rate of both networks; tested value: 0.0002
    'b1': 0.5, # Momentum for adam; tested value(dcgan paper): 0.5
    'b2': 0.999, # Momentum for adam; tested value(dcgan paper): 0.999
    'latent_dim': 256, # tested value which worked(in V4_1): 100
    'nc': 3, # number of color channels
    'ndf': 8, # number of discriminator features
    'ngf': 8, # number of generator features
    'epochs': 4, # the maxima lamount of epochs the algorith should run
    'save_model_every_epoch': 1, # how often we save our model
    'image_size': 128, # size of the image
    'num_workers': 3,
    'level_of_noise': 0.1, # how much instance noise we introduce(std; tested value: 0.15 and 0.1
    'experience_save_per_batch': 1, # this value should be very low; tested value which works: 1
    'experience_batch_size': 50 # this value shouldnt be too high; tested value which works: 50
    }
    hparams = Namespace(**args)

    # Parameters
    experiment_name = "DCGAN_6_2_MNIST_128px"
    dataset_name = "mnist"
    checkpoint_folder = "DCGAN/"
    tags = ["DCGAN", "128x128"]
    dirpath = Path(checkpoint_folder)

    # defining net
    net = DCGAN(hparams, checkpoint_folder, experiment_name)

    torch.autograd.set_detect_anomaly(True)
    trainer = pl.Trainer( # resume_from_checkpoint="DCGAN_V4_2_GHIBLI_epoch_999.ckpt",
    max_epochs=args["epochs"],
    gpus=1
    )

    trainer.fit(net)
    ``

    关于python - DCGAN 调试。得到只是垃圾,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60421475/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com