Source code for deeprobust.image.defense.pgdtraining

"""
This is an implementation of pgd adversarial training.
References
----------
..[1]MÄ…dry, A., Makelov, A., Schmidt, L., Tsipras, D., & Vladu, A. (2017).
Towards Deep Learning Models Resistant to Adversarial Attacks. stat, 1050, 9.
"""

import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
import torch.nn.functional as F

import numpy as np
from PIL import Image
from deeprobust.image.attack.pgd import PGD
from deeprobust.image.netmodels.CNN import Net
from deeprobust.image.defense.base_defense import BaseDefense


[docs]class PGDtraining(BaseDefense): """ PGD adversarial training. """ def __init__(self, model, device): if not torch.cuda.is_available(): print('CUDA not availiable, using cpu...') self.device = 'cpu' else: self.device = device self.model = model
[docs] def generate(self, train_loader, test_loader, **kwargs): """Call this function to generate robust model. Parameters ---------- train_loader : training data loader test_loader : testing data loader kwargs : kwargs """ self.parse_params(**kwargs) torch.manual_seed(100) device = torch.device(self.device) optimizer = optim.Adam(self.model.parameters(), self.lr) scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[75, 100], gamma = 0.1) save_model = True for epoch in range(1, self.epoch + 1): print('Training epoch: ', epoch, flush = True) self.train(self.device, train_loader, optimizer, epoch) self.test(self.model, self.device, test_loader) if (self.save_model and epoch % self.save_per_epoch == 0): if os.path.isdir(str(self.save_dir)): torch.save(self.model.state_dict(), str(self.save_dir) + self.save_name + '_epoch' + str(epoch) + '.pth') print("model saved in " + str(self.save_dir)) else: print("make new directory and save model in " + str(self.save_dir)) os.mkdir('./' + str(self.save_dir)) torch.save(self.model.state_dict(), str(self.save_dir) + self.save_name + '_epoch' + str(epoch) + '.pth') scheduler.step() return self.model
[docs] def parse_params(self, epoch_num = 100, save_dir = "./defense_models", save_name = "mnist_pgdtraining_0.3", save_model = True, epsilon = 8.0 / 255.0, num_steps = 10, perturb_step_size = 0.01, lr = 0.1, momentum = 0.1, save_per_epoch = 10): """Parameter parser. Parameters ---------- epoch_num : int epoch save_dir : str model dir save_name : str model name save_model : bool Whether to save model epsilon : float attack constraint num_steps : int PGD attack iteration time perturb_step_size : float perturb step size lr : float learning rate for adversary training process momentum : float momentum for optimizor """ self.epoch = epoch_num self.save_model = True self.save_dir = save_dir self.save_name = save_name self.epsilon = epsilon self.num_steps = num_steps self.perturb_step_size = perturb_step_size self.lr = lr self.momentum = momentum self.save_per_epoch = save_per_epoch
[docs] def train(self, device, train_loader, optimizer, epoch): """ training process. Parameters ---------- device : device train_loader : training data loader optimizer : optimizer epoch : training epoch """ self.model.train() correct = 0 bs = train_loader.batch_size #scheduler = StepLR(optimizer, step_size = 10, gamma = 0.5) scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones = [70], gamma = 0.1) for batch_idx, (data, target) in enumerate(train_loader): optimizer.zero_grad() data, target = data.to(device), target.to(device) data_adv, output = self.adv_data(data, target, ep = self.epsilon, num_steps = self.num_steps, perturb_step_size = self.perturb_step_size) loss = self.calculate_loss(output, target) loss.backward() optimizer.step() pred = output.argmax(dim = 1, keepdim = True) correct += pred.eq(target.view_as(pred)).sum().item() #print every 10 if batch_idx % 20 == 0: print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\tAccuracy:{:.2f}%'.format( epoch, batch_idx * len(data), len(train_loader.dataset), 100. * batch_idx / len(train_loader), loss.item(), 100 * correct/(bs))) correct = 0 scheduler.step()
[docs] def test(self, model, device, test_loader): """ testing process. Parameters ---------- model : model device : device test_loader : testing dataloder """ model.eval() test_loss = 0 correct = 0 test_loss_adv = 0 correct_adv = 0 for data, target in test_loader: data, target = data.to(device), target.to(device) # print clean accuracy output = model(data) test_loss += F.cross_entropy(output, target, reduction='sum').item() # sum up batch loss pred = output.argmax(dim = 1, keepdim = True) # get the index of the max log-probability correct += pred.eq(target.view_as(pred)).sum().item() # print adversarial accuracy data_adv, output_adv = self.adv_data(data, target, ep = self.epsilon, num_steps = self.num_steps) test_loss_adv += self.calculate_loss(output_adv, target, redmode = 'sum').item() # sum up batch loss pred_adv = output_adv.argmax(dim = 1, keepdim = True) # get the index of the max log-probability correct_adv += pred_adv.eq(target.view_as(pred_adv)).sum().item() test_loss /= len(test_loader.dataset) test_loss_adv /= len(test_loader.dataset) print('\nTest set: Clean loss: {:.3f}, Clean Accuracy: {}/{} ({:.0f}%)\n'.format( test_loss, correct, len(test_loader.dataset), 100. * correct / len(test_loader.dataset))) print('\nTest set: Adv loss: {:.3f}, Adv Accuracy: {}/{} ({:.0f}%)\n'.format( test_loss_adv, correct_adv, len(test_loader.dataset), 100. * correct_adv / len(test_loader.dataset)))
[docs] def adv_data(self, data, output, ep = 0.3, num_steps = 10, perturb_step_size = 0.01): """ Generate input(adversarial) data for training. """ adversary = PGD(self.model) data_adv = adversary.generate(data, output.flatten(), epsilon = ep, num_steps = num_steps, step_size = perturb_step_size) output = self.model(data_adv) return data_adv, output
[docs] def calculate_loss(self, output, target, redmode = 'mean'): """ Calculate loss for training. """ loss = F.cross_entropy(output, target, reduction = redmode) return loss