In 2014, Goodfellow et al. presented a method for training generative models called Generative Adversarial Networks (GANs for short). In a GAN, we build two different neural networks. Our first network is a traditional classification network, called the discriminator. We will train the discriminator to take images, and classify them as being real (belonging to the training set) or fake (not present in the training set). Our other network, called the generator, will take random noise as input and transform it using a neural network to produce images. The goal of the generator is to fool the discriminator into thinking the images it produced are real.
We can think of this back and forth process of the generator ($G$) trying to fool the discriminator ($D$), and the discriminator trying to correctly classify real vs. fake as a minimax game: $$\underset{G}{\text{minimize}}; \underset{D}{\text{maximize}}; \mathbb{E}{x \sim p_\text{data}}\left[\log D(x)\right] + \mathbb{E}{z \sim p(z)}\left[\log \left(1-D(G(z))\right)\right]$$ where $z \sim p(z)$ are the random noise samples, $G(z)$ are the generated images using the neural network generator $G$, and $D$ is the output of the discriminator, specifying the probability of an input being real. In Goodfellow et al., they analyze this minimax game and show how it relates to minimizing the Jensen-Shannon divergence between the training data distribution and the generated samples from $G$.
To optimize this minimax game, we will aternate between taking gradient descent steps on the objective for $G$, and gradient ascent steps on the objective for $D$:
While these updates are useful for analysis, they do not perform well in practice. Instead, we will use a different objective when we update the generator: maximize the probability of the discriminator making the incorrect choice. This small change helps to allevaiate problems with the generator gradient vanishing when the discriminator is confident. This is the standard update used in most GAN papers, and was used in the original paper from Goodfellow et al..
In this assignment, we will alternate the following updates:
import torch
import torch.nn as nn
from torch.nn import init
import torchvision
import torchvision.transforms as T
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.data import sampler
import torchvision.datasets as dset
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
%matplotlib inline
plt.rcParams['figure.figsize'] = (10.0, 8.0) # set default size of plots
plt.rcParams['image.interpolation'] = 'nearest'
plt.rcParams['image.cmap'] = 'gray'
def show_images(images):
images = np.reshape(images, [images.shape[0], -1]) # images reshape to (batch_size, D)
sqrtn = int(np.ceil(np.sqrt(images.shape[0])))
sqrtimg = int(np.ceil(np.sqrt(images.shape[1])))
fig = plt.figure(figsize=(sqrtn, sqrtn))
gs = gridspec.GridSpec(sqrtn, sqrtn)
gs.update(wspace=0.05, hspace=0.05)
for i, img in enumerate(images):
ax = plt.subplot(gs[i])
plt.axis('off')
ax.set_xticklabels([])
ax.set_yticklabels([])
ax.set_aspect('equal')
plt.imshow(img.reshape([sqrtimg,sqrtimg]))
return
def preprocess_img(x):
return 2 * x - 1.0
def deprocess_img(x):
return (x + 1.0) / 2.0
def rel_error(x,y):
return np.max(np.abs(x - y) / (np.maximum(1e-8, np.abs(x) + np.abs(y))))
def count_params(model):
"""Count the number of parameters in the current TensorFlow graph """
param_count = np.sum([np.prod(p.size()) for p in model.parameters()])
return param_count
answers = dict(np.load('gan-checks-tf.npz'))
class ChunkSampler(sampler.Sampler):
"""Samples elements sequentially from some offset.
Arguments:
num_samples: # of desired datapoints
start: offset where we should start selecting from
"""
def __init__(self, num_samples, start=0):
self.num_samples = num_samples
self.start = start
def __iter__(self):
return iter(range(self.start, self.start + self.num_samples))
def __len__(self):
return self.num_samples
NUM_TRAIN = 50000
NUM_VAL = 5000
NOISE_DIM = 96
batch_size = 128
mnist_train = dset.MNIST('./utils/datasets/MNIST_data', train=True, download=True,
transform=T.ToTensor())
loader_train = DataLoader(mnist_train, batch_size=batch_size,
sampler=ChunkSampler(NUM_TRAIN, 0))
mnist_val = dset.MNIST('./utils/datasets/MNIST_data', train=True, download=True,
transform=T.ToTensor())
loader_val = DataLoader(mnist_val, batch_size=batch_size,
sampler=ChunkSampler(NUM_VAL, NUM_TRAIN))
imgs = loader_train.__iter__().next()[0].view(batch_size, 784).numpy().squeeze()
show_images(imgs)
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to ./utils/datasets/MNIST_data/MNIST/raw/train-images-idx3-ubyte.gz
HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))
Extracting ./utils/datasets/MNIST_data/MNIST/raw/train-images-idx3-ubyte.gz to ./utils/datasets/MNIST_data/MNIST/raw
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to ./utils/datasets/MNIST_data/MNIST/raw/train-labels-idx1-ubyte.gz
HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))
Extracting ./utils/datasets/MNIST_data/MNIST/raw/train-labels-idx1-ubyte.gz to ./utils/datasets/MNIST_data/MNIST/raw
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to ./utils/datasets/MNIST_data/MNIST/raw/t10k-images-idx3-ubyte.gz
HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))
Extracting ./utils/datasets/MNIST_data/MNIST/raw/t10k-images-idx3-ubyte.gz to ./utils/datasets/MNIST_data/MNIST/raw
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to ./utils/datasets/MNIST_data/MNIST/raw/t10k-labels-idx1-ubyte.gz
HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))
Extracting ./utils/datasets/MNIST_data/MNIST/raw/t10k-labels-idx1-ubyte.gz to ./utils/datasets/MNIST_data/MNIST/raw
Processing...
Done!
/usr/local/lib/python3.6/dist-packages/torchvision/datasets/mnist.py:469: UserWarning: The given NumPy array is not writeable, and PyTorch does not support non-writeable tensors. This means you can write to the underlying (supposedly non-writeable) NumPy array using the tensor. You may want to copy the array to protect its data or make it writeable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at /pytorch/torch/csrc/utils/tensor_numpy.cpp:141.)
return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)
Generate uniform noise from -1 to 1 with shape [batch_size, dim]
.
def sample_noise(batch_size, dim):
"""
Generate a PyTorch Tensor of uniform random noise.
Input:
- batch_size: Integer giving the batch size of noise to generate.
- dim: Integer giving the dimension of noise to generate.
Output:
- A PyTorch Tensor of shape (batch_size, dim) containing uniform
random noise in the range (-1, 1).
"""
return torch.FloatTensor(batch_size, dim).uniform_(-1, 1)
Check noise is the correct shape and type:
def test_sample_noise():
batch_size = 3
dim = 4
torch.manual_seed(231)
z = sample_noise(batch_size, dim)
np_z = z.cpu().numpy()
assert np_z.shape == (batch_size, dim)
assert torch.is_tensor(z)
assert np.all(np_z >= -1.0) and np.all(np_z <= 1.0)
assert np.any(np_z < 0.0) and np.any(np_z > 0.0)
print('All tests passed!')
test_sample_noise()
All tests passed!
class Flatten(nn.Module):
def forward(self, x):
N, C, H, W = x.size() # read in N, C, H, W
return x.view(N, -1) # "flatten" the C * H * W values into a single vector per image
class Unflatten(nn.Module):
"""
An Unflatten module receives an input of shape (N, C*H*W) and reshapes it
to produce an output of shape (N, C, H, W).
"""
def __init__(self, N=-1, C=128, H=7, W=7):
super(Unflatten, self).__init__()
self.N = N
self.C = C
self.H = H
self.W = W
def forward(self, x):
return x.view(self.N, self.C, self.H, self.W)
def initialize_weights(m):
if isinstance(m, nn.Linear) or isinstance(m, nn.ConvTranspose2d):
init.xavier_uniform_(m.weight.data)
dtype = torch.FloatTensor
dtype = torch.cuda.FloatTensor # COMMENT THIS LINE IF YOU'RE ON A CPU!
def discriminator():
"""
Build and return a PyTorch model implementing the architecture.
"""
model = nn.Sequential( Flatten(),
nn.Linear(784, 256),
nn.LeakyReLU(inplace=True),
nn.Linear(256,256),
nn.LeakyReLU(inplace=True),
nn.Linear(256,1)
)
return model
Test to make sure the number of parameters in the discriminator is correct:
def test_discriminator(true_count=267009):
model = discriminator()
cur_count = count_params(model)
if cur_count != true_count:
print('Incorrect number of parameters in discriminator. Check your achitecture.')
else:
print('Correct number of parameters in discriminator.')
test_discriminator()
Correct number of parameters in discriminator.
def generator(noise_dim=NOISE_DIM):
"""
Build and return a PyTorch model implementing the architecture.
"""
model = nn.Sequential( nn.Linear(noise_dim,1024),
nn.ReLU(inplace=True),
nn.Linear(1024,1024),
nn.ReLU(inplace=True),
nn.Linear(1024,784),
nn.Tanh()
)
return model
Test to make sure the number of parameters in the generator is correct:
def test_generator(true_count=1858320):
model = generator(4)
cur_count = count_params(model)
if cur_count != true_count:
print('Incorrect number of parameters in generator. Check your achitecture.')
else:
print('Correct number of parameters in generator.')
test_generator()
Correct number of parameters in generator.
Compute the generator and discriminator loss. The generator loss is: $$\ell_G = -\mathbb{E}{z \sim p(z)}\left[\log D(G(z))\right]$$ and the discriminator loss is: $$ \ell_D = -\mathbb{E}{x \sim p_\text{data}}\left[\log D(x)\right] - \mathbb{E}_{z \sim p(z)}\left[\log \left(1-D(G(z))\right)\right]$$
def bce_loss(input, target):
"""pa
Inputs:
- input: PyTorch Tensor of shape (N, ) giving scores.
- target: PyTorch Tensor of shape (N,) containing 0 and 1 giving targets.
Returns:
- A PyTorch Tensor containing the mean BCE loss over the minibatch of input data.
"""
neg_abs = - input.abs()
loss = input.clamp(min=0) - input * target + (1 + neg_abs.exp()).log()
return loss.mean()
def discriminator_loss(logits_real, logits_fake):
"""
Computes the discriminator loss described above.
Inputs:
- logits_real: PyTorch Tensor of shape (N,) giving scores for the real data.
- logits_fake: PyTorch Tensor of shape (N,) giving scores for the fake data.
Returns:
- loss: PyTorch Tensor containing (scalar) the loss for the discriminator.
"""
N, _ = logits_real.size()
loss = (bce_loss(logits_real, torch.ones(N).type(dtype)))+(bce_loss(logits_fake, torch.zeros(N).type(dtype)))
return loss
def generator_loss(logits_fake):
"""
Computes the generator loss described above.
Inputs:
- logits_fake: PyTorch Tensor of shape (N,) giving scores for the fake data.
Returns:
- loss: PyTorch Tensor containing the (scalar) loss for the generator.
"""
N, _ = logits_fake.size()
loss = (bce_loss(logits_fake, torch.ones(N).type(dtype)))
return loss
Check generator and discriminator loss. We should see errors < 1e-7.
def test_discriminator_loss(logits_real, logits_fake, d_loss_true):
d_loss = discriminator_loss(torch.Tensor(logits_real).type(dtype),
torch.Tensor(logits_fake).type(dtype)).cpu().numpy()
print("Maximum error in d_loss: %g"%rel_error(d_loss_true, d_loss))
test_discriminator_loss(answers['logits_real'], answers['logits_fake'],
answers['d_loss_true'])
Maximum error in d_loss: 2.83811e-08
def test_generator_loss(logits_fake, g_loss_true):
g_loss = generator_loss(torch.Tensor(logits_fake).type(dtype)).cpu().numpy()
print("Maximum error in g_loss: %g"%rel_error(g_loss_true, g_loss))
test_generator_loss(answers['logits_fake'], answers['g_loss_true'])
Maximum error in g_loss: 3.4188e-08
def get_optimizer(model):
"""
Construct and return an Adam optimizer for the model with learning rate 1e-3,
beta1=0.5, and beta2=0.999.
Input:
- model: A PyTorch model that we want to optimize.
Returns:
- An Adam optimizer for the model with the desired hyperparameters.
"""
optimizer = optim.Adam(model.parameters(), lr = 1e-3, betas = (0.5,0.999))
return optimizer
def run_a_gan(D, G, D_solver, G_solver, discriminator_loss, generator_loss, show_every=250,
batch_size=128, noise_size=96, num_epochs=10):
"""
Train a GAN!
Inputs:
- D, G: PyTorch models for the discriminator and generator
- D_solver, G_solver: torch.optim Optimizers to use for training the
discriminator and generator.
- discriminator_loss, generator_loss: Functions to use for computing the generator and
discriminator loss, respectively.
- show_every: Show samples after every show_every iterations.
- batch_size: Batch size to use for training.
- noise_size: Dimension of the noise to use as input to the generator.
- num_epochs: Number of epochs over the training dataset to use for training.
"""
iter_count = 0
for epoch in range(num_epochs):
for x, _ in loader_train:
if len(x) != batch_size:
continue
D_solver.zero_grad()
real_data = x.type(dtype)
logits_real = D(2* (real_data - 0.5)).type(dtype)
g_fake_seed = sample_noise(batch_size, noise_size).type(dtype)
fake_images = G(g_fake_seed).detach()
logits_fake = D(fake_images.view(batch_size, 1, 28, 28))
d_total_error = discriminator_loss(logits_real, logits_fake)
d_total_error.backward()
D_solver.step()
G_solver.zero_grad()
g_fake_seed = sample_noise(batch_size, noise_size).type(dtype)
fake_images = G(g_fake_seed)
gen_logits_fake = D(fake_images.view(batch_size, 1, 28, 28))
g_error = generator_loss(gen_logits_fake)
g_error.backward()
G_solver.step()
if (iter_count % show_every == 0):
print('Iter: {}, D: {:.4}, G:{:.4}'.format(iter_count,d_total_error.item(),g_error.item()))
imgs_numpy = fake_images.data.cpu().numpy()
show_images(imgs_numpy[0:16])
plt.show()
print()
iter_count += 1
# Make the discriminator
D = discriminator().type(dtype)
# Make the generator
G = generator().type(dtype)
# Use the function you wrote earlier to get optimizers for the Discriminator and the Generator
D_solver = get_optimizer(D)
G_solver = get_optimizer(G)
# Run it!
run_a_gan(D, G, D_solver, G_solver, discriminator_loss, generator_loss)
Iter: 0, D: 1.328, G:0.7202
Iter: 250, D: 1.43, G:0.6752
Iter: 500, D: 1.181, G:1.414
Iter: 750, D: 1.204, G:1.556
Iter: 1000, D: 1.174, G:1.126
Iter: 1250, D: 1.255, G:1.068
Iter: 1500, D: 1.136, G:0.971
Iter: 1750, D: 1.317, G:0.7927
Iter: 2000, D: 1.274, G:0.9762
Iter: 2250, D: 1.258, G:0.9521
Iter: 2500, D: 1.202, G:0.833
Iter: 2750, D: 1.288, G:0.8659
Iter: 3000, D: 1.379, G:0.824
Iter: 3250, D: 1.392, G:0.8353
Iter: 3500, D: 1.296, G:0.8011
Iter: 3750, D: 1.221, G:0.841
In the iterations in the low 100s we should see black backgrounds, fuzzy shapes as you approach iteration 1000, and decent shapes, about half of which will be sharp and clearly recognizable as we pass 3000.
We'll now look at Least Squares GAN, a newer, more stable alernative to the original GAN loss function. For this part, all we have to do is change the loss function and retrain the model. We'll implement equation (9) in the paper, with the generator loss: $$\ell_G = \frac{1}{2}\mathbb{E}{z \sim p(z)}\left[\left(D(G(z))-1\right)^2\right]$$ and the discriminator loss: $$ \ell_D = \frac{1}{2}\mathbb{E}{x \sim p_\text{data}}\left[\left(D(x)-1\right)^2\right] + \frac{1}{2}\mathbb{E}_{z \sim p(z)}\left[ \left(D(G(z))\right)^2\right]$$
def ls_discriminator_loss(scores_real, scores_fake):
"""
Compute the Least-Squares GAN loss for the discriminator.
Inputs:
- scores_real: PyTorch Tensor of shape (N,) giving scores for the real data.
- scores_fake: PyTorch Tensor of shape (N,) giving scores for the fake data.
Outputs:
- loss: A PyTorch Tensor containing the loss.
"""
N,_ = scores_real.size()
loss = (0.5 * torch.mean((scores_real-torch.ones(N).type(dtype))**2)) + (0.5 * torch.mean(scores_fake**2))
return loss
def ls_generator_loss(scores_fake):
"""
Computes the Least-Squares GAN loss for the generator.
Inputs:
- scores_fake: PyTorch Tensor of shape (N,) giving scores for the fake data.
Outputs:
- loss: A PyTorch Tensor containing the loss.
"""
N,_ = scores_fake.size()
loss = (0.5 * torch.mean((scores_fake-torch.ones(N).type(dtype))**2))
return loss
Before running a GAN with our new loss function, let's check it:
def test_lsgan_loss(score_real, score_fake, d_loss_true, g_loss_true):
score_real = torch.Tensor(score_real).type(dtype)
score_fake = torch.Tensor(score_fake).type(dtype)
d_loss = ls_discriminator_loss(score_real, score_fake).cpu().numpy()
g_loss = ls_generator_loss(score_fake).cpu().numpy()
print("Maximum error in d_loss: %g"%rel_error(d_loss_true, d_loss))
print("Maximum error in g_loss: %g"%rel_error(g_loss_true, g_loss))
test_lsgan_loss(answers['logits_real'], answers['logits_fake'],
answers['d_loss_lsgan_true'], answers['g_loss_lsgan_true'])
Maximum error in d_loss: 1.64377e-08
Maximum error in g_loss: 2.7837e-09
Run the following cell to train model!
D_LS = discriminator().type(dtype)
G_LS = generator().type(dtype)
D_LS_solver = get_optimizer(D_LS)
G_LS_solver = get_optimizer(G_LS)
run_a_gan(D_LS, G_LS, D_LS_solver, G_LS_solver, ls_discriminator_loss, ls_generator_loss)
Iter: 0, D: 0.5689, G:0.51
Iter: 250, D: 0.1481, G:0.3264
Iter: 500, D: 0.2063, G:0.4708
Iter: 750, D: 0.1258, G:0.2649
Iter: 1000, D: 0.152, G:0.4361
Iter: 1250, D: 0.1842, G:0.2598
Iter: 1500, D: 0.1986, G:0.2422
Iter: 1750, D: 0.2018, G:0.2362
Iter: 2000, D: 0.2339, G:0.1912
Iter: 2250, D: 0.2559, G:0.2198
Iter: 2500, D: 0.2503, G:0.1511
Iter: 2750, D: 0.2112, G:0.1597
Iter: 3000, D: 0.2393, G:0.1796
Iter: 3250, D: 0.2336, G:0.1621
Iter: 3500, D: 0.2206, G:0.1707
Iter: 3750, D: 0.2488, G:0.1253
def build_dc_classifier():
"""
Build and return a PyTorch model for the DCGAN discriminator implementing
the architecture above.
"""
return nn.Sequential( Unflatten(batch_size, 1, 28, 28),
nn.Conv2d(1, 32, kernel_size = 5, stride = 1),
nn.LeakyReLU(inplace=True),
nn.MaxPool2d(2,2),
nn.Conv2d(32, 64,kernel_size = 5, stride = 1),
nn.LeakyReLU(inplace=True),
nn.MaxPool2d(2,2),
Flatten(),
nn.Linear(1024, 1024),
nn.LeakyReLU(inplace=True),
nn.Linear(1024,1)
)
data = next(enumerate(loader_train))[-1][0].type(dtype)
b = build_dc_classifier().type(dtype)
out = b(data)
print(out.size())
torch.Size([128, 1])
Check the number of parameters in your classifier as a sanity check:
def test_dc_classifer(true_count=1102721):
model = build_dc_classifier()
cur_count = count_params(model)
if cur_count != true_count:
print('Incorrect number of parameters in generator. Check your achitecture.')
else:
print('Correct number of parameters in generator.')
test_dc_classifer()
Correct number of parameters in generator.
def build_dc_generator(noise_dim=NOISE_DIM):
"""
Build and return a PyTorch model implementing the DCGAN generator using
the architecture described above.
"""
return nn.Sequential( nn.Linear(noise_dim,1024),
nn.ReLU(inplace=True),
nn.BatchNorm1d(1024),
nn.Linear(1024,6272),
nn.ReLU(inplace=True),
nn.BatchNorm1d(6272),
Unflatten(batch_size, 128, 7, 7),
nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
nn.ReLU(inplace=True),
nn.BatchNorm2d(64),
nn.ConvTranspose2d(64, 1, kernel_size=4, stride=2, padding=1),
nn.Tanh(),
Flatten()
)
test_g_gan = build_dc_generator().type(dtype)
test_g_gan.apply(initialize_weights)
fake_seed = torch.randn(batch_size, NOISE_DIM).type(dtype)
fake_images = test_g_gan.forward(fake_seed)
fake_images.size()
torch.Size([128, 784])
Check the number of parameters in your generator as a sanity check:
def test_dc_generator(true_count=6580801):
model = build_dc_generator(4)
cur_count = count_params(model)
if cur_count != true_count:
print('Incorrect number of parameters in generator. Check your achitecture.')
else:
print('Correct number of parameters in generator.')
test_dc_generator()
Correct number of parameters in generator.
D_DC = build_dc_classifier().type(dtype)
D_DC.apply(initialize_weights)
G_DC = build_dc_generator().type(dtype)
G_DC.apply(initialize_weights)
D_DC_solver = get_optimizer(D_DC)
G_DC_solver = get_optimizer(G_DC)
run_a_gan(D_DC, G_DC, D_DC_solver, G_DC_solver, discriminator_loss, generator_loss, num_epochs=5)
Iter: 0, D: 1.448, G:1.464
Iter: 250, D: 1.198, G:0.7545
Iter: 500, D: 1.233, G:1.017
Iter: 750, D: 1.185, G:1.144
Iter: 1000, D: 1.198, G:1.03
Iter: 1250, D: 1.216, G:0.9504
Iter: 1500, D: 1.115, G:1.105
Iter: 1750, D: 1.069, G:0.8992