Source code for secml.adv.attacks.evasion.cleverhans.c_attack_evasion_cleverhans

"""
.. module:: CAttackEvasionCleverhans
    :synopsis: Performs one of the Cleverhans Evasion attacks
                against a classifier.

.. moduleauthor:: Ambra Demontis <ambra.demontis@unica.it>
.. moduleauthor:: Maura Pintor <maura.pintor@unica.it>

"""
import numpy as np
import tensorflow as tf
from cleverhans.attacks import \
    FastGradientMethod, CarliniWagnerL2, ElasticNetMethod, SPSA, LBFGS, \
    ProjectedGradientDescent, SaliencyMapMethod, MomentumIterativeMethod, \
    MadryEtAl, BasicIterativeMethod, DeepFool
from cleverhans.model import Model

from secml.adv.attacks import CAttack
from secml.adv.attacks.evasion import CAttackEvasion
from secml.adv.attacks.evasion.cleverhans.c_attack_evasion_cleverhans_losses \
    import CAttackEvasionCleverhansLossesMixin
from secml.array import CArray
from secml.core import CCreator
from secml.core.constants import nan
from secml.core.exceptions import NotFittedError
from secml.ml.classifiers.reject import CClassifierReject
from secml.optim.function import CFunction

SUPPORTED_ATTACKS = [
    FastGradientMethod, CarliniWagnerL2, ElasticNetMethod, SPSA, LBFGS,
    ProjectedGradientDescent, SaliencyMapMethod, MomentumIterativeMethod,
    MadryEtAl, BasicIterativeMethod, DeepFool
]


[docs]class CAttackEvasionCleverhans(CAttackEvasion, CAttackEvasionCleverhansLossesMixin): """This class is a wrapper of the attacks implemented in the Cleverhans library. Credits: https://github.com/tensorflow/cleverhans. Parameters ---------- classifier : CClassifier Target classifier (trained). y_target : int or None, optional If None an indiscriminate attack will be performed, else a targeted attack to have the samples misclassified as belonging to the y_target class. clvh_attack_class: The CleverHans class that implement the attack store_var_list: list list of variables to store from the graph during attack run. The variables will be stored as key-value dictionary and can be retrieved through the property `stored_vars`. **kwargs Any other parameter for the cleverhans attack. Notes ----- The current Tensorflow default graph will be used. """ __class_type = 'e-cleverhans' def __init__(self, classifier, y_target=None, clvh_attack_class=CarliniWagnerL2, store_var_list=None, **kwargs): self._tfsess = tf.compat.v1.Session() self._eps_0 = False # store the cleverhans attack parameters self.attack_params = kwargs # Check if the cleverhans attack is supported if clvh_attack_class not in SUPPORTED_ATTACKS: raise ValueError("This cleverhans attack is not supported yet!") self._clvrh_attack_class = clvh_attack_class self._clvrh_clf = None self._last_f_eval = 0 self._last_grad_eval = 0 if store_var_list is not None: # first, check if the user has set stored variables self._stored_vars = {k: [] for k in store_var_list} elif any([self._clvrh_attack_class == CarliniWagnerL2, self._clvrh_attack_class == ElasticNetMethod, ]): # store `const` by default for these attacks as it # is needed in the `objective_function` computation self._stored_vars = {'const': []} else: self._stored_vars = None super(CAttackEvasionCleverhans, self).__init__( classifier=classifier, y_target=y_target) self._n_classes = self._classifier.n_classes self._n_feats = self._classifier.n_features self._initialize_tf_ops()
[docs] def set(self, param_name, param_value, copy=False): # we need the possibility of running the attack for eps==0, # this is not allowed in standard cleverhans if 'eps' in param_name: if param_value == 0: param_value = 1 self._eps_0 = True else: self._eps_0 = False if param_name.startswith('attack_params'): super(CAttackEvasionCleverhans, self).set(param_name, param_value, copy) # re-initialize the Tensorflow operations self._initialize_tf_ops()
[docs] def run(self, x, y, ds_init=None): # override run for applying storage of internally # optimized variables if self._stored_vars is not None: for key in self._stored_vars: self._stored_vars[key] = [] return super(CAttackEvasionCleverhans, self).run( x, y, ds_init=ds_init)
########################################################################### # READ-ONLY ATTRIBUTES ########################################################################### @property def f_eval(self): if self._clvrh_clf: return self._last_f_eval else: raise ValueError("Attack not performed yet!") @property def grad_eval(self): if self._clvrh_clf: return self._last_grad_eval else: raise ValueError("Attack not performed yet!") @property def stored_vars(self): """Variables extracted from the graph during execution of the attack. """ return self._stored_vars @property def attack_params(self): """Object containing all Cleverhans parameters """ return self._attack_params @attack_params.setter def attack_params(self, value): """Object containing all Cleverhans parameters """ self._attack_params = _CClvrh_params(value) ########################################################################### # PRIVATE METHODS ###########################################################################
[docs] def objective_function(self, x): """Objective function. Parameters ---------- x : CArray or CDataset Returns ------- f_obj : float or CArray of floats """ if self._clvrh_attack_class == CarliniWagnerL2: return self._objective_function_cw(x) elif self._clvrh_attack_class == ElasticNetMethod: return self._objective_function_elastic_net(x) elif self._clvrh_attack_class == SPSA: return self._objective_function_SPSA(x) elif self._clvrh_attack_class in [ FastGradientMethod, ProjectedGradientDescent, LBFGS, MomentumIterativeMethod, MadryEtAl, BasicIterativeMethod]: return self._objective_function_cross_entropy(x) else: raise NotImplementedError
[docs] def objective_function_gradient(self, x): """Gradient of the objective function.""" raise NotImplementedError
def _create_tf_operations(self): """ Call `generate` from the Cleverhans attack to construct the Tensorflow operation needed to perform the attack """ if self.y_target is None: if 'y' in self._clvrh_attack.feedable_kwargs: self._adv_x_T = self._clvrh_attack.generate( self._initial_x_P, y=self._y_P, **self.attack_params.__dict__) else: # 'y' not required by attack self._adv_x_T = self._clvrh_attack.generate( self._initial_x_P, **self.attack_params.__dict__) else: if 'y_target' not in self._clvrh_attack.feedable_kwargs: raise RuntimeError( "cannot perform a targeted {:} attack".format( self._clvrh_attack.__class__.__name__)) self._adv_x_T = self._clvrh_attack.generate( self._initial_x_P, y_target=self._y_P, **self._attack_params.__dict__) def _initialize_tf_ops(self): # create the cleverhans attack object tf.reset_default_graph() self._tfsess.close() session_conf = tf.compat.v1.ConfigProto( inter_op_parallelism_threads=-1, # Perform in caller's thread use_per_session_threads=False # Per-session thread pools ) self._tfsess = tf.compat.v1.Session(config=session_conf) # wrap the surrogate classifier into a cleverhans classifier self._clvrh_clf = _CModelCleverhans( self.classifier, out_dims=self._n_classes) # create an instance of the chosen cleverhans attack self._clvrh_attack = self._clvrh_attack_class( self._clvrh_clf, sess=self._tfsess) # create the placeholder to feed into the attack the initial evasion # samples self._initial_x_P = tf.compat.v1.placeholder( tf.float32, shape=(None, self._n_feats)) # placeholder used to feed the true or the target label (it is a # one-hot encoded vector) self._y_P = tf.compat.v1.placeholder( tf.float32, shape=(1, self._n_classes)) # call the function of the cleverhans attack called `generate` that # constucts the Tensorflow operation needed to perform the attack self._create_tf_operations() def _define_warning_filter(self): # We filter few warnings raised by numpy, caused by cleverhans self.logger.filterwarnings( "ignore", category=RuntimeWarning, message="invalid value encountered in double_scalars*" ) self.logger.filterwarnings( "ignore", category=RuntimeWarning, message="Mean of empty slice*" ) def _create_one_hot_y(self): """ Cleverhans attacks need to receive y as a one hot vector. y is equal to the y target if y_target is present, otherwhise is equal to the true class of the attack sample. """ one_hot_y = CArray.zeros(shape=(1, self._n_classes), dtype=np.float32) if self.y_target is not None: one_hot_y[0, self.y_target] = 1 else: # indiscriminate attack one_hot_y[0, self._y0.item()] = 1 return one_hot_y def _run(self, x0, y0, x_init=None): """Perform evasion for a given dmax on a single pattern. It solves: min_x g(x), s.t. c(x,x0) <= dmax Parameters ---------- x0 : CArray Initial sample. y0 : int or CArray The true label of x0. x_init : CArray or None, optional Initialization point. If None, it is set to x0. Returns ------- x_opt : CArray Evasion sample f_opt : float Value of objective function on x_opt (from surrogate learner). Notes ----- Internally, this class stores the values of the objective function and sequence of attack points (if enabled). """ self._clvrh_clf.reset_eval() # if data can not be modified by the attacker, exit if not self.is_attack_class(y0): self._x_seq = x_init self._x_opt = x_init self._f_opt = nan self._f_seq = nan return self._x_opt, self._f_opt if x_init is None: x_init = x0 if not isinstance(x_init, CArray): raise TypeError("Input vectors should be of class CArray") if self._eps_0 is True: return x0, nan self._x0 = x0 self._y0 = y0 x = self._x0.atleast_2d().tondarray().astype(np.float32) # initialize caching of the attack path self._clvrh_clf.reset_caching(x0) # create a one-hot-encoded vector to feed the true or # the y_target label one_hot_y = CArray.zeros(shape=(1, self._n_classes), dtype=np.float32) if self.y_target is not None: one_hot_y[0, self.y_target] = 1 else: # indiscriminate attack one_hot_y[0, self._y0.item()] = 1 with self.logger.catch_warnings(): # We filter few warnings raised by numpy, caused by cleverhans self._define_warning_filter() # Cleverhans attacks need to receive y as a one hot vector. # y is equal to the y target if y_target is present, otherwhise is # equal to the true class of the attack sample. one_hot_y = self._create_one_hot_y() self._x_opt = self._tfsess.run( self._adv_x_T, feed_dict={self._initial_x_P: x, self._y_P: one_hot_y.tondarray()}) self._x_opt = CArray(self._x_opt) self._x_seq = self._clvrh_clf._x_seq if (self.x_opt - self._x_seq[0, :]).norm_2d() > 1e-6: self._x_seq = self._x_seq.append(self._x_opt, axis=0) else: # some attack returns at the initial point if # condition is not met (e.g. CWL2) self._x_opt = self._x_seq[-1, :] if self._stored_vars is not None: for key in self._stored_vars: self._stored_vars[key].append(self._get_variable_value(key)) self._last_f_eval = self._clvrh_clf.f_eval self._last_grad_eval = self._clvrh_clf.grad_eval return self._x_opt, nan # TODO: return value of objective_fun(x_opt) def _get_variable_value(self, var_name): const = self._clvrh_clf.get_variable_value(var_name) const_value = self._tfsess.run(const) return const_value
class _CModelCleverhans(Model): """Receive our library classifier and convert it into a cleverhans model. Parameters ---------- clf : CClassifier SecML classifier, should be already trained. out_dims : int or None The expected number of classes. Notes ----- The Tensorflow model will be added to the current Tensorflow default graph. """ @property def f_eval(self): return self._fun.n_fun_eval @property def grad_eval(self): return self._fun.n_grad_eval def reset_eval(self): """Reset the number of evaluations.""" self._fun.reset_eval() def _decision_function(self, x): """ Wrapper of the classifier discriminant function. This is needed because the output of the CFunction should be either a scalar or a CArray whereas the predict function returns a tuple. """ if hasattr(self, '_x_seq') and self._x_seq is not None: if self._is_init is True: # avoid storing twice the initial value self._is_init = False else: # Cache intermediate values self._x_seq = self._x_seq.append(x, axis=0) return self._clf.forward(x, caching=True) # TODO: caching required? def __init__(self, clf, out_dims=None): self._clf = clf if isinstance(clf, CClassifierReject): raise ValueError("classifier with reject cannot be " "converted to a tensorflow model") if not clf.is_fitted(): raise NotFittedError("The classifier should be already trained!") self._out_dims = out_dims # classifier output tensor name. Either "probs" or "logits". self._output_layer = 'logits' # Given a trained CClassifier, creates a tensorflow node for the # network output and one for its gradient self._fun = CFunction(fun=self._decision_function, gradient=clf.gradient) self._callable_fn = _CClassifierToTF(self._fun, self._out_dims) super(_CModelCleverhans, self).__init__(nb_classes=clf.n_classes) def fprop(self, x, **kwargs): """ Parameters ---------- x : np.ndarray Input samples. **kwargs : dict Any other argument for function. Returns ------- dict """ if len(kwargs) != 0: # TODO: SUPPORT KWARGS raise ValueError("kwargs not supported") return {self._output_layer: self._callable_fn(x, **kwargs)} def reset_caching(self, x=None): """Sets the caching for storing the attack path.""" if x is not None: self._x_seq = CArray(x) # this is used for avoiding double storage # of the initial path self._is_init = True else: self._x_seq = None def get_variable_value(self, variable_name): return tf.get_default_graph().get_tensor_by_name( "{:}:0".format(variable_name)) class _CClassifierToTF: """ Creates a Tensorflow operation whose result are the scores produced by the discriminant function of a CClassifier subclass. Accordingly, the gradient of the discriminant function will be the gradient of the created Tensorflow operation. Parameters ---------- fun : CFunction that have as function the discriminant function of the classifier. out_dims : The number of output dimensions (classes) of the model. Returns ------- tf_model_fn A Tensorfow operation that maps an input (tf.Tensor) to the output of model discriminant function (tf.Tensor) and on which the tensorflow function "gradient" can be called to compute its gradient. """ def __init__(self, fun, out_dims=1): self.fun = fun self.out_dims = out_dims def __call__(self, x_op): """ Given an input, this function return a PyFunction (a Tensorflow operation) that compute the function "_fprop_fn" and whose gradient is the one give by the function "_tf_gradient_fn". Returns ------- out: PyFunction tensorflow operation on which the gradient function can be used to have its gradient. """ out = _py_func_with_gradient(self._fprop_fn, [x_op], Tout=[tf.float32], stateful=True, grad_func=self._tf_gradient_fn)[0] out.set_shape([None, self.out_dims]) return out def _fprop_fn(self, x_np): """Numpy function that computes and returns the output of the model. Parameters ---------- x_np: np.ndarray The input that should be classified by the model. Returns ------- scores: np.ndarray The scores given as output by the classifier. """ # compute the scores (the model output) f_x = self.fun.fun scores = f_x(CArray(x_np)).atleast_2d().tondarray().astype(np.float32) return scores def _np_grad_fn(self, x_np, grads_in_np=None): """ Function that compute the gradient of the classifier discriminant function Parameters ---------- x_np: np.ndarray Input samples (2D array) grads_in_np: np.ndarray Vector for which the gradient should be pre-multiplied. Returns ------- gradient: np.ndarray The gradient of the classifier discriminant function. """ x_carray = CArray(x_np).atleast_2d() grads_in_np = CArray(grads_in_np).atleast_2d() n_samples = x_carray.shape[0] if n_samples > 1: raise ValueError("The gradient of CCleverhansAttack can be " "computed only for one sample at time") grad_f_x = self.fun.gradient grads = grad_f_x(x_carray, w=grads_in_np).atleast_2d() return grads.tondarray().astype(np.float32) def _tf_gradient_fn(self, op, grads_in): """ Parameters ---------- op : numpy function. grads_in : input of the gradient function. Returns ------- gradient: PyFunction tensorflow operation that compute the gradient of a given function that works with numpy array. """ pyfun = tf.compat.v1.py_func( self._np_grad_fn, [op.inputs[0], grads_in], Tout=[tf.float32]) return pyfun def _py_func_with_gradient( func, inp, Tout, stateful=True, pyfun_name=None, grad_func=None): """ Given a function that returns as output a numpy array, and optionally a function that computes its gradient, this function returns a pyfunction. A pyfunction wraps a function that works with numpy arrays as an operation in a TensorFlow graph. credits: https://gist.github.com/kingspp/3ec7d9958c13b94310c1a365759aa3f4 Parameters ---------- func : Custom Function. inp : Function Inputs. Tout : Output Type of out Custom Function. stateful : Calculate Gradients when stateful is True. pyfun_name : Name of the PyFunction. grad_func : Custom Gradient Function. Returns ---------- fun : Pyfunction Tensorflow operation that compute the given function and eventually its gradient. """ # Generate random name in order to avoid conflicts with inbuilt names from random import getrandbits rnd_name = 'PyFuncGrad-' + '%0x' % getrandbits(30 * 4) # Register Tensorflow Gradient tf.RegisterGradient(rnd_name)(grad_func) # Get current graph g = tf.compat.v1.get_default_graph() # Add gradient override map with g.gradient_override_map( {"PyFunc": rnd_name, "PyFuncStateless": rnd_name}): return tf.compat.v1.py_func( func, inp, Tout, stateful=stateful, name=pyfun_name) class _CClvrh_params(CCreator): def __init__(self, param_dict): # create a property for each dictionary key self.__dict__ = param_dict