Source code for deeprobust.graph.global_attack.mettack
"""
Adversarial Attacks on Graph Neural Networks via Meta Learning. ICLR 2019
https://openreview.net/pdf?id=Bylnx209YX
Author Tensorflow implementation:
https://github.com/danielzuegner/gnn-meta-attack
"""
import math
import numpy as np
import scipy.sparse as sp
import torch
from torch import optim
from torch.nn import functional as F
from torch.nn.parameter import Parameter
from tqdm import tqdm
from deeprobust.graph import utils
from deeprobust.graph.global_attack import BaseAttack
[docs]class BaseMeta(BaseAttack):
"""Abstract base class for meta attack. Adversarial Attacks on Graph Neural
Networks via Meta Learning, ICLR 2019,
https://openreview.net/pdf?id=Bylnx209YX
Parameters
----------
model :
model to attack. Default `None`.
nnodes : int
number of nodes in the input graph
lambda_ : float
lambda_ is used to weight the two objectives in Eq. (10) in the paper.
feature_shape : tuple
shape of the input node features
attack_structure : bool
whether to attack graph structure
attack_features : bool
whether to attack node features
device: str
'cpu' or 'cuda'
"""
def __init__(self, model=None, nnodes=None, feature_shape=None, lambda_=0.5, attack_structure=True, attack_features=False, device='cpu'):
super(BaseMeta, self).__init__(model, nnodes, attack_structure, attack_features, device)
self.lambda_ = lambda_
assert attack_features or attack_structure, 'attack_features or attack_structure cannot be both False'
self.modified_adj = None
self.modified_features = None
if attack_structure:
assert nnodes is not None, 'Please give nnodes='
self.adj_changes = Parameter(torch.FloatTensor(nnodes, nnodes))
self.adj_changes.data.fill_(0)
if attack_features:
assert feature_shape is not None, 'Please give feature_shape='
self.feature_changes = Parameter(torch.FloatTensor(feature_shape))
self.feature_changes.data.fill_(0)
self.with_relu = model.with_relu
def get_modified_adj(self, ori_adj):
adj_changes_square = self.adj_changes - torch.diag(torch.diag(self.adj_changes, 0))
ind = np.diag_indices(self.adj_changes.shape[0])
adj_changes_symm = torch.clamp(adj_changes_square + torch.transpose(adj_changes_square, 1, 0), -1, 1)
modified_adj = adj_changes_symm + ori_adj
return modified_adj
def get_modified_features(self, ori_features):
return ori_features + self.feature_changes
[docs] def filter_potential_singletons(self, modified_adj):
"""
Computes a mask for entries potentially leading to singleton nodes, i.e. one of the two nodes corresponding to
the entry have degree 1 and there is an edge between the two nodes.
"""
degrees = modified_adj.sum(0)
degree_one = (degrees == 1)
resh = degree_one.repeat(modified_adj.shape[0], 1).float()
l_and = resh * modified_adj
logical_and_symmetric = l_and + l_and.t()
flat_mask = 1 - logical_and_symmetric
return flat_mask
def self_training_label(self, labels, idx_train):
# Predict the labels of the unlabeled nodes to use them for self-training.
output = self.surrogate.output
labels_self_training = output.argmax(1)
labels_self_training[idx_train] = labels[idx_train]
return labels_self_training
[docs] def log_likelihood_constraint(self, modified_adj, ori_adj, ll_cutoff):
"""
Computes a mask for entries that, if the edge corresponding to the entry is added/removed, would lead to the
log likelihood constraint to be violated.
Note that different data type (float, double) can effect the final results.
"""
t_d_min = torch.tensor(2.0).to(self.device)
t_possible_edges = np.array(np.triu(np.ones((self.nnodes, self.nnodes)), k=1).nonzero()).T
allowed_mask, current_ratio = utils.likelihood_ratio_filter(t_possible_edges,
modified_adj,
ori_adj, t_d_min,
ll_cutoff)
return allowed_mask, current_ratio
def get_adj_score(self, adj_grad, modified_adj, ori_adj, ll_constraint, ll_cutoff):
adj_meta_grad = adj_grad * (-2 * modified_adj + 1)
# Make sure that the minimum entry is 0.
adj_meta_grad -= adj_meta_grad.min()
# Filter self-loops
adj_meta_grad -= torch.diag(torch.diag(adj_meta_grad, 0))
# # Set entries to 0 that could lead to singleton nodes.
singleton_mask = self.filter_potential_singletons(modified_adj)
adj_meta_grad = adj_meta_grad * singleton_mask
if ll_constraint:
allowed_mask, self.ll_ratio = self.log_likelihood_constraint(modified_adj, ori_adj, ll_cutoff)
allowed_mask = allowed_mask.to(self.device)
adj_meta_grad = adj_meta_grad * allowed_mask
return adj_meta_grad
def get_feature_score(self, feature_grad, modified_features):
feature_meta_grad = feature_grad * (-2 * modified_features + 1)
feature_meta_grad -= feature_meta_grad.min()
return feature_meta_grad
[docs]class Metattack(BaseMeta):
"""Meta attack. Adversarial Attacks on Graph Neural Networks
via Meta Learning, ICLR 2019.
Examples
--------
>>> import numpy as np
>>> from deeprobust.graph.data import Dataset
>>> from deeprobust.graph.defense import GCN
>>> from deeprobust.graph.global_attack import Metattack
>>> data = Dataset(root='/tmp/', name='cora')
>>> adj, features, labels = data.adj, data.features, data.labels
>>> idx_train, idx_val, idx_test = data.idx_train, data.idx_val, data.idx_test
>>> idx_unlabeled = np.union1d(idx_val, idx_test)
>>> idx_unlabeled = np.union1d(idx_val, idx_test)
>>> # Setup Surrogate model
>>> surrogate = GCN(nfeat=features.shape[1], nclass=labels.max().item()+1,
nhid=16, dropout=0, with_relu=False, with_bias=False, device='cpu').to('cpu')
>>> surrogate.fit(features, adj, labels, idx_train, idx_val, patience=30)
>>> # Setup Attack Model
>>> model = Metattack(surrogate, nnodes=adj.shape[0], feature_shape=features.shape,
attack_structure=True, attack_features=False, device='cpu', lambda_=0).to('cpu')
>>> # Attack
>>> model.attack(features, adj, labels, idx_train, idx_unlabeled, n_perturbations=10, ll_constraint=False)
>>> modified_adj = model.modified_adj
"""
def __init__(self, model, nnodes, feature_shape=None, attack_structure=True, attack_features=False, device='cpu', with_bias=False, lambda_=0.5, train_iters=100, lr=0.1, momentum=0.9):
super(Metattack, self).__init__(model, nnodes, feature_shape, lambda_, attack_structure, attack_features, device)
self.momentum = momentum
self.lr = lr
self.train_iters = train_iters
self.with_bias = with_bias
self.weights = []
self.biases = []
self.w_velocities = []
self.b_velocities = []
self.hidden_sizes = self.surrogate.hidden_sizes
self.nfeat = self.surrogate.nfeat
self.nclass = self.surrogate.nclass
previous_size = self.nfeat
for ix, nhid in enumerate(self.hidden_sizes):
weight = Parameter(torch.FloatTensor(previous_size, nhid).to(device))
w_velocity = torch.zeros(weight.shape).to(device)
self.weights.append(weight)
self.w_velocities.append(w_velocity)
if self.with_bias:
bias = Parameter(torch.FloatTensor(nhid).to(device))
b_velocity = torch.zeros(bias.shape).to(device)
self.biases.append(bias)
self.b_velocities.append(b_velocity)
previous_size = nhid
output_weight = Parameter(torch.FloatTensor(previous_size, self.nclass).to(device))
output_w_velocity = torch.zeros(output_weight.shape).to(device)
self.weights.append(output_weight)
self.w_velocities.append(output_w_velocity)
if self.with_bias:
output_bias = Parameter(torch.FloatTensor(self.nclass).to(device))
output_b_velocity = torch.zeros(output_bias.shape).to(device)
self.biases.append(output_bias)
self.b_velocities.append(output_b_velocity)
self._initialize()
def _initialize(self):
for w, v in zip(self.weights, self.w_velocities):
stdv = 1. / math.sqrt(w.size(1))
w.data.uniform_(-stdv, stdv)
v.data.fill_(0)
if self.with_bias:
for b, v in zip(self.biases, self.b_velocities):
stdv = 1. / math.sqrt(w.size(1))
b.data.uniform_(-stdv, stdv)
v.data.fill_(0)
def inner_train(self, features, adj_norm, idx_train, idx_unlabeled, labels):
self._initialize()
for ix in range(len(self.hidden_sizes) + 1):
self.weights[ix] = self.weights[ix].detach()
self.weights[ix].requires_grad = True
self.w_velocities[ix] = self.w_velocities[ix].detach()
self.w_velocities[ix].requires_grad = True
if self.with_bias:
self.biases[ix] = self.biases[ix].detach()
self.biases[ix].requires_grad = True
self.b_velocities[ix] = self.b_velocities[ix].detach()
self.b_velocities[ix].requires_grad = True
for j in range(self.train_iters):
hidden = features
for ix, w in enumerate(self.weights):
b = self.biases[ix] if self.with_bias else 0
if self.sparse_features:
hidden = adj_norm @ torch.spmm(hidden, w) + b
else:
hidden = adj_norm @ hidden @ w + b
if self.with_relu and ix != len(self.weights) - 1:
hidden = F.relu(hidden)
output = F.log_softmax(hidden, dim=1)
loss_labeled = F.nll_loss(output[idx_train], labels[idx_train])
weight_grads = torch.autograd.grad(loss_labeled, self.weights, create_graph=True)
self.w_velocities = [self.momentum * v + g for v, g in zip(self.w_velocities, weight_grads)]
if self.with_bias:
bias_grads = torch.autograd.grad(loss_labeled, self.biases, create_graph=True)
self.b_velocities = [self.momentum * v + g for v, g in zip(self.b_velocities, bias_grads)]
self.weights = [w - self.lr * v for w, v in zip(self.weights, self.w_velocities)]
if self.with_bias:
self.biases = [b - self.lr * v for b, v in zip(self.biases, self.b_velocities)]
def get_meta_grad(self, features, adj_norm, idx_train, idx_unlabeled, labels, labels_self_training):
hidden = features
for ix, w in enumerate(self.weights):
b = self.biases[ix] if self.with_bias else 0
if self.sparse_features:
hidden = adj_norm @ torch.spmm(hidden, w) + b
else:
hidden = adj_norm @ hidden @ w + b
if self.with_relu and ix != len(self.weights) - 1:
hidden = F.relu(hidden)
output = F.log_softmax(hidden, dim=1)
loss_labeled = F.nll_loss(output[idx_train], labels[idx_train])
loss_unlabeled = F.nll_loss(output[idx_unlabeled], labels_self_training[idx_unlabeled])
loss_test_val = F.nll_loss(output[idx_unlabeled], labels[idx_unlabeled])
if self.lambda_ == 1:
attack_loss = loss_labeled
elif self.lambda_ == 0:
attack_loss = loss_unlabeled
else:
attack_loss = self.lambda_ * loss_labeled + (1 - self.lambda_) * loss_unlabeled
print('GCN loss on unlabled data: {}'.format(loss_test_val.item()))
print('GCN acc on unlabled data: {}'.format(utils.accuracy(output[idx_unlabeled], labels[idx_unlabeled]).item()))
print('attack loss: {}'.format(attack_loss.item()))
adj_grad, feature_grad = None, None
if self.attack_structure:
adj_grad = torch.autograd.grad(attack_loss, self.adj_changes, retain_graph=True)[0]
if self.attack_features:
feature_grad = torch.autograd.grad(attack_loss, self.feature_changes, retain_graph=True)[0]
return adj_grad, feature_grad
[docs] def attack(self, ori_features, ori_adj, labels, idx_train, idx_unlabeled, n_perturbations, ll_constraint=True, ll_cutoff=0.004):
"""Generate n_perturbations on the input graph.
Parameters
----------
ori_features :
Original (unperturbed) node feature matrix
ori_adj :
Original (unperturbed) adjacency matrix
labels :
node labels
idx_train :
node training indices
idx_unlabeled:
unlabeled nodes indices
n_perturbations : int
Number of perturbations on the input graph. Perturbations could
be edge removals/additions or feature removals/additions.
ll_constraint: bool
whether to exert the likelihood ratio test constraint
ll_cutoff : float
The critical value for the likelihood ratio test of the power law distributions.
See the Chi square distribution with one degree of freedom. Default value 0.004
corresponds to a p-value of roughly 0.95. It would be ignored if `ll_constraint`
is False.
"""
self.sparse_features = sp.issparse(ori_features)
ori_adj, ori_features, labels = utils.to_tensor(ori_adj, ori_features, labels, device=self.device)
labels_self_training = self.self_training_label(labels, idx_train)
modified_adj = ori_adj
modified_features = ori_features
for i in tqdm(range(n_perturbations), desc="Perturbing graph"):
if self.attack_structure:
modified_adj = self.get_modified_adj(ori_adj)
if self.attack_features:
modified_features = ori_features + self.feature_changes
adj_norm = utils.normalize_adj_tensor(modified_adj)
self.inner_train(modified_features, adj_norm, idx_train, idx_unlabeled, labels)
adj_grad, feature_grad = self.get_meta_grad(modified_features, adj_norm, idx_train, idx_unlabeled, labels, labels_self_training)
adj_meta_score = torch.tensor(0.0).to(self.device)
feature_meta_score = torch.tensor(0.0).to(self.device)
if self.attack_structure:
adj_meta_score = self.get_adj_score(adj_grad, modified_adj, ori_adj, ll_constraint, ll_cutoff)
if self.attack_features:
feature_meta_score = self.get_feature_score(feature_grad, modified_features)
if adj_meta_score.max() >= feature_meta_score.max():
adj_meta_argmax = torch.argmax(adj_meta_score)
row_idx, col_idx = utils.unravel_index(adj_meta_argmax, ori_adj.shape)
self.adj_changes.data[row_idx][col_idx] += (-2 * modified_adj[row_idx][col_idx] + 1)
self.adj_changes.data[col_idx][row_idx] += (-2 * modified_adj[row_idx][col_idx] + 1)
else:
feature_meta_argmax = torch.argmax(feature_meta_score)
row_idx, col_idx = utils.unravel_index(feature_meta_argmax, ori_features.shape)
self.feature_changes.data[row_idx][col_idx] += (-2 * modified_features[row_idx][col_idx] + 1)
if self.attack_structure:
self.modified_adj = self.get_modified_adj(ori_adj).detach()
if self.attack_features:
self.modified_features = self.get_modified_features(ori_features).detach()
[docs]class MetaApprox(BaseMeta):
"""Approximated version of Meta Attack. Adversarial Attacks on
Graph Neural Networks via Meta Learning, ICLR 2019.
Examples
--------
>>> import numpy as np
>>> from deeprobust.graph.data import Dataset
>>> from deeprobust.graph.defense import GCN
>>> from deeprobust.graph.global_attack import MetaApprox
>>> from deeprobust.graph.utils import preprocess
>>> data = Dataset(root='/tmp/', name='cora')
>>> adj, features, labels = data.adj, data.features, data.labels
>>> adj, features, labels = preprocess(adj, features, labels, preprocess_adj=False) # conver to tensor
>>> idx_train, idx_val, idx_test = data.idx_train, data.idx_val, data.idx_test
>>> idx_unlabeled = np.union1d(idx_val, idx_test)
>>> # Setup Surrogate model
>>> surrogate = GCN(nfeat=features.shape[1], nclass=labels.max().item()+1,
nhid=16, dropout=0, with_relu=False, with_bias=False, device='cpu').to('cpu')
>>> surrogate.fit(features, adj, labels, idx_train, idx_val, patience=30)
>>> # Setup Attack Model
>>> model = MetaApprox(surrogate, nnodes=adj.shape[0], feature_shape=features.shape,
attack_structure=True, attack_features=False, device='cpu', lambda_=0).to('cpu')
>>> # Attack
>>> model.attack(features, adj, labels, idx_train, idx_unlabeled, n_perturbations=10, ll_constraint=True)
>>> modified_adj = model.modified_adj
"""
def __init__(self, model, nnodes, feature_shape=None, attack_structure=True, attack_features=False, device='cpu', with_bias=False, lambda_=0.5, train_iters=100, lr=0.01):
super(MetaApprox, self).__init__(model, nnodes, feature_shape, lambda_, attack_structure, attack_features, device)
self.lr = lr
self.train_iters = train_iters
self.adj_meta_grad = None
self.features_meta_grad = None
if self.attack_structure:
self.adj_grad_sum = torch.zeros(nnodes, nnodes).to(device)
if self.attack_features:
self.feature_grad_sum = torch.zeros(feature_shape).to(device)
self.with_bias = with_bias
self.weights = []
self.biases = []
previous_size = self.nfeat
for ix, nhid in enumerate(self.hidden_sizes):
weight = Parameter(torch.FloatTensor(previous_size, nhid).to(device))
bias = Parameter(torch.FloatTensor(nhid).to(device))
previous_size = nhid
self.weights.append(weight)
self.biases.append(bias)
output_weight = Parameter(torch.FloatTensor(previous_size, self.nclass).to(device))
output_bias = Parameter(torch.FloatTensor(self.nclass).to(device))
self.weights.append(output_weight)
self.biases.append(output_bias)
self.optimizer = optim.Adam(self.weights + self.biases, lr=lr) # , weight_decay=5e-4)
self._initialize()
def _initialize(self):
for w, b in zip(self.weights, self.biases):
# w.data.fill_(1)
# b.data.fill_(1)
stdv = 1. / math.sqrt(w.size(1))
w.data.uniform_(-stdv, stdv)
b.data.uniform_(-stdv, stdv)
self.optimizer = optim.Adam(self.weights + self.biases, lr=self.lr)
def inner_train(self, features, modified_adj, idx_train, idx_unlabeled, labels, labels_self_training):
adj_norm = utils.normalize_adj_tensor(modified_adj)
for j in range(self.train_iters):
# hidden = features
# for w, b in zip(self.weights, self.biases):
# if self.sparse_features:
# hidden = adj_norm @ torch.spmm(hidden, w) + b
# else:
# hidden = adj_norm @ hidden @ w + b
# if self.with_relu:
# hidden = F.relu(hidden)
hidden = features
for ix, w in enumerate(self.weights):
b = self.biases[ix] if self.with_bias else 0
if self.sparse_features:
hidden = adj_norm @ torch.spmm(hidden, w) + b
else:
hidden = adj_norm @ hidden @ w + b
if self.with_relu:
hidden = F.relu(hidden)
output = F.log_softmax(hidden, dim=1)
loss_labeled = F.nll_loss(output[idx_train], labels[idx_train])
loss_unlabeled = F.nll_loss(output[idx_unlabeled], labels_self_training[idx_unlabeled])
if self.lambda_ == 1:
attack_loss = loss_labeled
elif self.lambda_ == 0:
attack_loss = loss_unlabeled
else:
attack_loss = self.lambda_ * loss_labeled + (1 - self.lambda_) * loss_unlabeled
self.optimizer.zero_grad()
loss_labeled.backward(retain_graph=True)
if self.attack_structure:
self.adj_changes.grad.zero_()
self.adj_grad_sum += torch.autograd.grad(attack_loss, self.adj_changes, retain_graph=True)[0]
if self.attack_features:
self.feature_changes.grad.zero_()
self.feature_grad_sum += torch.autograd.grad(attack_loss, self.feature_changes, retain_graph=True)[0]
self.optimizer.step()
loss_test_val = F.nll_loss(output[idx_unlabeled], labels[idx_unlabeled])
print('GCN loss on unlabled data: {}'.format(loss_test_val.item()))
print('GCN acc on unlabled data: {}'.format(utils.accuracy(output[idx_unlabeled], labels[idx_unlabeled]).item()))
[docs] def attack(self, ori_features, ori_adj, labels, idx_train, idx_unlabeled, n_perturbations, ll_constraint=True, ll_cutoff=0.004):
"""Generate n_perturbations on the input graph.
Parameters
----------
ori_features :
Original (unperturbed) node feature matrix
ori_adj :
Original (unperturbed) adjacency matrix
labels :
node labels
idx_train :
node training indices
idx_unlabeled:
unlabeled nodes indices
n_perturbations : int
Number of perturbations on the input graph. Perturbations could
be edge removals/additions or feature removals/additions.
ll_constraint: bool
whether to exert the likelihood ratio test constraint
ll_cutoff : float
The critical value for the likelihood ratio test of the power law distributions.
See the Chi square distribution with one degree of freedom. Default value 0.004
corresponds to a p-value of roughly 0.95. It would be ignored if `ll_constraint`
is False.
"""
ori_adj, ori_features, labels = utils.to_tensor(ori_adj, ori_features, labels, device=self.device)
labels_self_training = self.self_training_label(labels, idx_train)
self.sparse_features = sp.issparse(ori_features)
modified_adj = ori_adj
modified_features = ori_features
for i in tqdm(range(n_perturbations), desc="Perturbing graph"):
self._initialize()
if self.attack_structure:
modified_adj = self.get_modified_adj(ori_adj)
self.adj_grad_sum.data.fill_(0)
if self.attack_features:
modified_features = ori_features + self.feature_changes
self.feature_grad_sum.data.fill_(0)
self.inner_train(modified_features, modified_adj, idx_train, idx_unlabeled, labels, labels_self_training)
adj_meta_score = torch.tensor(0.0).to(self.device)
feature_meta_score = torch.tensor(0.0).to(self.device)
if self.attack_structure:
adj_meta_score = self.get_adj_score(self.adj_grad_sum, modified_adj, ori_adj, ll_constraint, ll_cutoff)
if self.attack_features:
feature_meta_score = self.get_feature_score(self.feature_grad_sum, modified_features)
if adj_meta_score.max() >= feature_meta_score.max():
adj_meta_argmax = torch.argmax(adj_meta_score)
row_idx, col_idx = utils.unravel_index(adj_meta_argmax, ori_adj.shape)
self.adj_changes.data[row_idx][col_idx] += (-2 * modified_adj[row_idx][col_idx] + 1)
self.adj_changes.data[col_idx][row_idx] += (-2 * modified_adj[row_idx][col_idx] + 1)
else:
feature_meta_argmax = torch.argmax(feature_meta_score)
row_idx, col_idx = utils.unravel_index(feature_meta_argmax, ori_features.shape)
self.feature_changes.data[row_idx][col_idx] += (-2 * modified_features[row_idx][col_idx] + 1)
if self.attack_structure:
self.modified_adj = self.get_modified_adj(ori_adj).detach()
if self.attack_features:
self.modified_features = self.get_modified_features(ori_features).detach()