Source code for secml.adv.attacks.poisoning.c_attack_poisoning

"""
.. module:: CAttackPoisoning
   :synopsis: Interface for poisoning attacks

.. moduleauthor:: Ambra Demontis <ambra.demontis@unica.it>
.. moduleauthor:: Battista Biggio <battista.biggio@unica.it>

"""
import warnings
from abc import ABCMeta, abstractmethod

from secml.adv.attacks import CAttack, CAttackMixin
from secml.optim.optimizers import COptimizer
from secml.array import CArray
from secml.data import CDataset
from secml.ml.classifiers.loss import CLoss
from secml.ml.peval.metrics import CMetric
from secml.optim.constraints import CConstraint
from secml.optim.function import CFunction


[docs]class CAttackPoisoning(CAttackMixin, metaclass=ABCMeta): """Interface for poisoning attacks. Parameters ---------- classifier : CClassifier Target classifier. training_data : CDataset Dataset on which the the classifier has been trained on. val : CDataset Validation set. distance : {'l1' or 'l2'}, optional Norm to use for computing the distance of the adversarial example from the original sample. Default 'l2'. dmax : scalar, optional Maximum value of the perturbation. Default 1. lb, ub : int or CArray, optional Lower/Upper bounds. If int, the same bound will be applied to all the features. If CArray, a different bound can be specified for each feature. Default `lb = 0`, `ub = 1`. y_target : int or None, optional If None an error-generic attack will be performed, else a error-specific attack to have the samples misclassified as belonging to the `y_target` class. solver_type : str or None, optional Identifier of the solver to be used. Default 'pgd-ls'. solver_params : dict or None, optional Parameters for the solver. Default None, meaning that default parameters will be used. init_type : {'random', 'loss_based'}, optional Strategy used to chose the initial random samples. Default 'random'. random_seed : int or None, optional If int, random_state is the seed used by the random number generator. If None, no fixed seed will be set. """ __super__ = 'CAttackPoisoning' def __init__(self, classifier, training_data, val, distance='l2', dmax=0, lb=0, ub=1, y_target=None, solver_type='pgd-ls', solver_params=None, init_type='random', random_seed=None): super(CAttackPoisoning, self).__init__( classifier=classifier, distance=distance, dmax=dmax, lb=lb, ub=ub, solver_type=solver_type, solver_params=solver_params) # fixme: validation loss should be optional and passed from outside if classifier.class_type == 'svm': loss_name = 'hinge' elif classifier.class_type == 'logistic': loss_name = 'log' elif classifier.class_type == 'ridge': loss_name = 'square' else: raise NotImplementedError("We cannot poisoning that classifier") self._attacker_loss = CLoss.create( loss_name) self._init_loss = self._attacker_loss self.y_target = y_target # hashing xc to avoid re-training clf when xc does not change self._xc_hash = None self._x0 = None # set the initial poisoning sample feature self._xc = None # set of poisoning points along with their labels yc self._yc = None self._idx = None # index of the current point to be optimized self._training_data = None # training set used to learn classifier self._n_points = None # FIXME: INIT PARAM? # READ/WRITE self.val = val # this is for validation set self.training_data = training_data self.random_seed = random_seed self.init_type = init_type self.eta = solver_params['eta'] # this is used to speed up some poisoning algorithms by re-using # the solution obtained at a previous step of the optimization self._warm_start = None @property def y_target(self): return self._y_target @y_target.setter def y_target(self, value): self._y_target = value ########################################################################### # READ-WRITE ATTRIBUTES ########################################################################### @property def val(self): """Returns the attacker's validation data""" return self._val @val.setter def val(self, value): """Sets the attacker's validation data""" if value is None: self._val = None return if not isinstance(value, CDataset): raise TypeError('val should be a CDataset!') self._val = value @property def training_data(self): """Returns the training set used to learn the targeted classifier""" return self._training_data @training_data.setter def training_data(self, value): """Sets the training set used to learn the targeted classifier""" # mandatory parameter, we raise error also if value is None if not isinstance(value, CDataset): raise TypeError('training_data should be a CDataset!') self._training_data = value @property def random_seed(self): """Returns the attacker's validation data""" return self._random_seed @random_seed.setter def random_seed(self, value): """Sets the attacker's validation data""" self._random_seed = value @property def n_points(self): """Returns the number of poisoning points.""" return self._n_points @n_points.setter def n_points(self, value): """Sets the number of poisoning points.""" if value is None: self._n_points = None return self._n_points = int(value) @property def x0(self): """Returns the attacker's initial sample features""" return self._x0 @x0.setter def x0(self, value): """Set the attacker's initial sample features""" self._x0 = value @property def xc(self): """Returns the attacker's sample features""" return self._xc @xc.setter def xc(self, value): """Set the attacker's sample features""" self._xc = value @property def yc(self): """Returns the attacker's sample label""" return self._yc @yc.setter def yc(self, value): """Set the attacker's sample label""" self._yc = value ########################################################################### # PRIVATE METHODS ########################################################################### def _constraint_creation(self): # only feature increments or decrements are allowed lb = self._x0 if self.lb == 'x0' else self.lb ub = self._x0 if self.ub == 'x0' else self.ub bounds = CConstraint.create('box', lb=lb, ub=ub) constr = CConstraint.create(self.distance, center=0, radius=1e12) return bounds, constr def _init_solver(self): """Create solver instance.""" if self.classifier is None: raise ValueError('Solver not set properly!') # map attributes to fun, constr, box fun = CFunction(fun=self.objective_function, gradient=self.objective_function_gradient, n_dim=self._classifier.n_features) bounds, constr = self._constraint_creation() self._solver = COptimizer.create( self._solver_type, fun=fun, constr=constr, bounds=bounds, **self.solver_params) self._solver.verbose = 0 self._warm_start = None def _rnd_init_poisoning_points( self, n_points=None, init_from_val=False, val=None): """Returns a random set of poisoning points randomly with flipped labels.""" if init_from_val: if val: init_dataset = val else: init_dataset = self.val else: init_dataset = self.training_data if (self._n_points is None or self._n_points == 0) and ( n_points is None or n_points == 0): raise ValueError("Number of poisoning points (n_points) not set!") if n_points is None: n_points = self.n_points idx = CArray.randsample(init_dataset.num_samples, n_points, random_state=self.random_seed) xc = init_dataset.X[idx, :].deepcopy() # if the attack is in a continuous space we add a # little perturbation to the initial poisoning point random_noise = CArray.rand(shape=xc.shape, random_state=self.random_seed) xc += 1e-3 * (2 * random_noise - 1) yc = CArray(init_dataset.Y[idx]).deepcopy() # true labels # randomly pick yc from a different class for i in range(yc.size): labels = CArray.randsample(init_dataset.num_classes, 2, random_state=self.random_seed) if yc[i] == labels[0]: yc[i] = labels[1] else: yc[i] = labels[0] return xc, yc def _update_poisoned_clf(self, clf=None, tr=None, train_normalizer=False): """ Trains classifier on D (original training data) plus {x,y} (new point). Parameters ---------- x: feature vector of new training point y: true label of new training point Returns ------- clf: trained classifier on D and {x,y} """ # xc hashing is only valid if clf and tr do not change # (when calling update_poisoned_clf() without parameters) xc_hash_is_valid = False if clf is None and tr is None: xc_hash_is_valid = True if clf is None: clf = self.classifier if tr is None: tr = self.training_data tr = tr.append(CDataset(self._xc, self._yc)) xc_hash = self._xc.sha1() if self._xc_hash is None or self._xc_hash != xc_hash: # xc set has changed, retrain clf # hash is stored only if update_poisoned_clf() is called w/out pars self._xc_hash = xc_hash if xc_hash_is_valid else None self._poisoned_clf = clf.deepcopy() # we assume that normalizer is not changing w.r.t xc! # so we avoid re-training the normalizer on dataset including xc if self.classifier.preprocess is not None: self._poisoned_clf.retrain_normalizer = train_normalizer self._poisoned_clf.fit(tr.X, tr.Y) return self._poisoned_clf, tr ########################################################################### # OBJECTIVE FUNCTION & GRAD COMPUTATION ###########################################################################
[docs] def objective_function(self, xc, acc=False): """ Parameters ---------- xc: poisoning point Returns ------- f_obj: values of objective function (average hinge loss) at x """ # index of poisoning point within xc. # This will be replaced by the input parameter xc if self._idx is None: idx = 0 else: idx = self._idx xc = CArray(xc).atleast_2d() n_samples = xc.shape[0] if n_samples > 1: raise TypeError("xc is not a single sample!") self._xc[idx, :] = xc clf, tr = self._update_poisoned_clf() y_pred, score = clf.predict(self.val.X, return_decision_function=True) # targeted attacks y_ts = CArray(self._y_target).repeat(score.shape[0]) \ if self._y_target is not None else self.val.Y # TODO: binary loss check if self._attacker_loss.class_type != 'softmax': score = CArray(score[:, 1].ravel()) if acc is True: error = CArray(y_ts != y_pred).ravel() # compute test error else: error = self._attacker_loss.loss(y_ts, score) obj = error.mean() return obj
[docs] def objective_function_gradient(self, xc, normalization=True): """ Compute the loss derivative wrt the attack sample xc The derivative is decomposed as: dl / x = sum^n_c=1 ( dl / df_c * df_c / x ) """ xc = xc.atleast_2d() n_samples = xc.shape[0] if n_samples > 1: raise TypeError("x is not a single sample!") # index of poisoning point within xc. # This will be replaced by the input parameter xc if self._idx is None: idx = 0 else: idx = self._idx self._xc[idx, :] = xc clf, tr = self._update_poisoned_clf() # computing gradient of loss(y, f(x)) w.r.t. f _, score = clf.predict(self.val.X, return_decision_function=True) y_ts = CArray(self._y_target).repeat(score.shape[0]) \ if self._y_target is not None else self.val.Y grad = CArray.zeros((xc.size,)) if clf.n_classes <= 2: loss_grad = self._attacker_loss.dloss( y_ts, CArray(score[:, 1]).ravel()) grad = self._gradient_fk_xc( self._xc[idx, :], self._yc[idx], clf, loss_grad, tr) else: # compute the gradient as a sum of the gradient for each class for c in range(clf.n_classes): loss_grad = self._attacker_loss.dloss(y_ts, score, c=c) grad += self._gradient_fk_xc(self._xc[idx, :], self._yc[idx], clf, loss_grad, tr, c) if normalization: norm = grad.norm() return grad / norm if norm > 0 else grad else: return grad
@abstractmethod def _gradient_fk_xc(self, xc, yc, clf, loss_grad, tr, k=None): """ Derivative of the classifier's discriminant function f_k(x) computed on a set of points x w.r.t. a single poisoning point xc This is a classifier-specific implementation, so we delegate its implementation to inherited classes. """ pass ########################################################################### # POISONING INTERNAL ROUTINE ON SINGLE DATA POINT (PRIVATE) ########################################################################### def _run(self, xc, yc, idx=0): """Single point poisoning. Here xc can be a *set* of points, in which case idx specifies which point should be manipulated by the poisoning attack. """ xc = CArray(xc.deepcopy()).atleast_2d() self._yc = yc self._xc = xc self._idx = idx # point to be optimized within xc self._x0 = self._xc[idx, :].ravel() self._init_solver() if self.y_target is None: # indiscriminate attack x = self._solver.maximize(self._x0) else: # targeted attack x = self._solver.minimize(self._x0) self._solution_from_solver() return x ########################################################################### # PUBLIC METHODS ###########################################################################
[docs] def run(self, x, y, ds_init=None, max_iter=1): """Runs poisoning on multiple points. It reads n_points (previously set), initializes xc, yc at random, and then optimizes the poisoning points xc. Parameters ---------- x : CArray Validation set for evaluating classifier performance. Note that this is not the validation data used by the attacker, which should be passed instead to `CAttackPoisoning` init. y : CArray Corresponding true labels for samples in `x`. ds_init : CDataset or None, optional. Dataset for warm start. max_iter : int, optional Number of iterations to re-optimize poisoning data. Default 1. Returns ------- y_pred : predicted labels for all val samples by targeted classifier scores : scores for all val samples by targeted classifier adv_xc : manipulated poisoning points xc (for subsequents warm starts) f_opt : final value of the objective function """ if self._n_points is None or self._n_points == 0: # evaluate performance on x,y y_pred, scores = self._classifier.predict( x, return_decision_function=True) return y_pred, scores, ds_init, 0 # n_points > 0 if self.init_type == 'random': # randomly sample xc and yc xc, yc = self._rnd_init_poisoning_points() elif self.init_type == 'loss_based': xc, yc = self._loss_based_init_poisoning_points() else: raise NotImplementedError( "Unknown poisoning point initialization strategy.") # re-set previously-optimized points if passed as input if ds_init is not None: xc[0:ds_init.num_samples, :] = ds_init.X yc[0:ds_init.num_samples] = ds_init.Y delta = 1.0 k = 0 # max_iter ignored for single-point attacks if self.n_points == 1: max_iter = 1 metric = CMetric.create('accuracy') while delta > 0 and k < max_iter: self.logger.info( "Iter on all the poisoning samples: {:}".format(k)) xc_prv = xc.deepcopy() for i in range(self._n_points): # this is to optimize the last points first # (and then re-optimize the first ones) idx = self.n_points - i - 1 xc[idx, :] = self._run(xc, yc, idx=idx) # optimizing poisoning point 0 self.logger.info( "poisoning point {:} optim fopt: {:}".format( i, self._f_opt)) y_pred, scores = self._poisoned_clf.predict( x, return_decision_function=True) acc = metric.performance_score(y_true=y, y_pred=y_pred) self.logger.info("Poisoned classifier accuracy " "on test data {:}".format(acc)) delta = (xc_prv - xc).norm_2d() self.logger.info( "Optimization with n points: " + str(self._n_points) + " iter: " + str(k) + ", delta: " + str(delta) + ", fopt: " + str(self._f_opt)) k += 1 # re-train the targeted classifier (copied) on poisoned data # to evaluate attack effectiveness on targeted classifier clf, tr = self._update_poisoned_clf(clf=self._classifier, tr=self._training_data, train_normalizer=False) # fixme: rechange train_normalizer=True y_pred, scores = clf.predict(x, return_decision_function=True) acc = metric.performance_score(y_true=y, y_pred=y_pred) self.logger.info( "Original classifier accuracy on test data {:}".format(acc)) return y_pred, scores, CDataset(xc, yc), self._f_opt
def _loss_based_init_poisoning_points(self, n_points=None): """ """ raise NotImplementedError def _compute_grad_inv(self, G, H, grad_loss_params): from scipy import linalg det = linalg.det(H.tondarray()) if abs(det) < 1e-6: H_inv = CArray(linalg.pinv2(H.tondarray())) else: H_inv = CArray(linalg.inv(H.tondarray())) grad_mat = - CArray(G.dot(H_inv)) # d * (d + 1) self._d_params_xc = grad_mat gt = grad_mat.dot(grad_loss_params) return gt.ravel() def _compute_grad_solve(self, G, H, grad_loss_params, sym_pos=True): from scipy import linalg v = linalg.solve( H.tondarray(), grad_loss_params.tondarray(), sym_pos=sym_pos) v = CArray(v) gt = -G.dot(v) return gt.ravel() def _compute_grad_solve_iterative(self, G, H, grad_loss_params, tol=1e-6): from scipy.sparse import linalg if self._warm_start is None: v, convergence = linalg.cg( H.tondarray(), grad_loss_params.tondarray(), tol=tol) else: v, convergence = linalg.cg( H.tondarray(), grad_loss_params.tondarray(), tol=tol, x0=self._warm_start.tondarray()) if convergence != 0: warnings.warn('Convergence of poisoning algorithm not reached!') v = CArray(v.ravel()) # store v to be used as warm start at the next iteration self._warm_start = v gt = -G.dot(v.T) return gt.ravel()