Source code for secml.ml.classifiers.sklearn.c_classifier_sgd

"""
.. module:: CClassifierSGD
   :synopsis: Stochastic Gradient Descent (SGD) classifier

.. moduleauthor:: Marco Melis <marco.melis@unica.it>

"""
from sklearn import linear_model

from secml.array import CArray
from secml.ml.classifiers import CClassifierLinear
from secml.ml.classifiers.loss import CLoss
from secml.ml.classifiers.regularizer import CRegularizer
from secml.ml.kernels import CKernel
from secml.utils.mixed_utils import check_is_fitted
from secml.ml.classifiers.gradients import CClassifierGradientSGDMixin

import warnings


[docs]class CClassifierSGD(CClassifierLinear, CClassifierGradientSGDMixin): """Stochastic Gradient Descent Classifier. Parameters ---------- loss : CLoss Loss function to be used during classifier training. regularizer : CRegularizer Regularizer function to be used during classifier training. kernel : None or CKernel subclass, optional .. deprecated:: 0.12 Instance of a CKernel subclass to be used for computing similarity between patterns. If None (default), a linear SVM will be created. In the future this parameter will be removed from this classifier and kernels will have to be passed as preprocess. alpha : float, optional Constant that multiplies the regularization term. Default 0.01. Also used to compute learning_rate when set to 'optimal'. fit_intercept : bool, optional If True (default), the intercept is calculated, else no intercept will be used in calculations (e.g. data is expected to be already centered). max_iter : int, optional The maximum number of passes over the training data (aka epochs). Default 1000. tol : float or None, optional The stopping criterion. If it is not None, the iterations will stop when (loss > best_loss - tol) for 5 consecutive epochs. Default None. shuffle : bool, optional If True (default) the training data is shuffled after each epoch. learning_rate : str, optional The learning rate schedule. If 'constant', eta = eta0; if 'optimal' (default), eta = 1.0 / (alpha * (t + t0)), where t0 is chosen by a heuristic proposed by Leon Bottou; if 'invscaling', eta = eta0 / pow(t, power_t); if 'adaptive', eta = eta0, as long as the training keeps decreasing. eta0 : float, optional The initial learning rate for the 'constant', 'invscaling' or 'adaptive' schedules. Default 10.0. power_t : float, optional The exponent for inverse scaling learning rate. Default 0.5. class_weight : {dict, 'balanced', None}, optional Set the parameter C of class i to `class_weight[i] * C`. If not given (default), all classes are supposed to have weight one. The 'balanced' mode uses the values of labels to automatically adjust weights inversely proportional to class frequencies as `n_samples / (n_classes * np.bincount(y))`. warm_start : bool, optional If True, reuse the solution of the previous call to fit as initialization, otherwise, just erase the previous solution. Default False. average : bool or int, optional If True, computes the averaged SGD weights and stores the result in the `coef_` attribute. If set to an int greater than 1, averaging will begin once the total number of samples seen reaches average. Default False. random_state : int, RandomState or None, optional The seed of the pseudo random number generator to use when shuffling the data. If int, random_state is the seed used by the random number generator; If RandomState instance, random_state is the random number generator; If None, the random number generator is the RandomState instance used by `np.random`. Default None. preprocess : CPreProcess or str or None, optional Features preprocess to be applied to input data. Can be a CPreProcess subclass or a string with the type of the desired preprocessor. If None, input data is used as is. Attributes ---------- class_type : 'sgd' """ __class_type = 'sgd' def __init__(self, loss, regularizer, kernel=None, alpha=0.01, fit_intercept=True, max_iter=1000, tol=None, shuffle=True, learning_rate='optimal', eta0=10.0, power_t=0.5, class_weight=None, warm_start=False, average=False, random_state=None, preprocess=None): # Calling the superclass init CClassifierLinear.__init__(self, preprocess=preprocess) # Keep private (not an sklearn sgd parameter) self._loss = CLoss.create(loss) # Keep private (not an sklearn sgd parameter) self._regularizer = CRegularizer.create(regularizer) # Classifier parameters self.alpha = alpha self.fit_intercept = fit_intercept self.max_iter = max_iter self.tol = tol self.shuffle = shuffle self.learning_rate = learning_rate self.eta0 = eta0 self.power_t = power_t self.class_weight = class_weight self.warm_start = warm_start self.average = average self.random_state = random_state # Similarity function (bound) to use for computing features # Keep private (not a param of SGD) if kernel is not None: warnings.warn("`kernel` parameter in `CClassifierSGD` is " "deprecated from 0.12, in the future kernels will " "have to be passed as preprocess.", DeprecationWarning) self._kernel = kernel if kernel is None else CKernel.create(kernel) self._tr = None # slot for the training data
[docs] def is_kernel_linear(self): """Return True if the kernel is None or linear.""" if self.kernel is None or self.kernel.class_type == 'linear': return True return False
def _check_is_fitted(self): """Check if the classifier is trained (fitted). Raises ------ NotFittedError If the classifier is not fitted. """ if not self.is_kernel_linear(): check_is_fitted(self, '_tr') super(CClassifierSGD, self)._check_is_fitted() @property def loss(self): """Returns the loss function used by classifier.""" return self._loss @property def regularizer(self): """Returns the regularizer function used by classifier.""" return self._regularizer @property def alpha(self): """Returns the Constant that multiplies the regularization term.""" return self._alpha @alpha.setter def alpha(self, value): """Sets the Constant that multiplies the regularization term.""" self._alpha = float(value) @property def C(self): """Constant that multiplies the regularization term. Equal to 1 / alpha. """ return 1.0 / self.alpha @property def class_weight(self): """Weight of each training class.""" return self._class_weight @class_weight.setter def class_weight(self, value): """Sets the weight of each training class.""" if isinstance(value, dict) and len(value) != 2: raise ValueError("weight of positive (+1) and negative (0) " "classes only must be specified.") self._class_weight = value @property def average(self): """When set to True, computes the averaged SGD weights. If set to an int greater than 1, averaging will begin once the total number of samples seen reaches average. So average=10 will begin averaging after seeing 10 samples.""" return self._average @average.setter def average(self, value): """Sets the average parameter.""" self._average = value @property def eta0(self): """The initial learning rate for the `invscaling` learning rate. Default is 10.0 (corresponding to sqrt(1.0/sqrt(alpha)), with alpha=0.0001). """ return self._eta0 @eta0.setter def eta0(self, value): """Sets eta0""" self._eta0 = float(value) @property def power_t(self): """The exponent for inverse scaling learning rate.""" return self._power_t @power_t.setter def power_t(self, value): """Sets power_t""" self._power_t = float(value) @property def kernel(self): """Kernel function.""" return self._kernel @kernel.setter def kernel(self, kernel): """Setting up the Kernel function (None if a linear classifier). This property is deprecated, as in the future kernel will have to be passed as preprocess.""" warnings.warn("`kernel` parameter in `CClassifierSGD` is " "deprecated from 0.12, in the future kernels will " "have to be passed as preprocess.", DeprecationWarning) self._kernel = kernel @property def tr(self): """Training set.""" return self._tr @property def n_tr_samples(self): """Returns the number of training samples.""" return self._tr.shape[0] if self._tr is not None else None def _fit(self, dataset): """Trains the One-Vs-All SGD classifier. The following is a private method computing one single binary (2-classes) classifier of the OVA schema. Representation of each classifier attribute for the multiclass case is explained in corresponding property description. Parameters ---------- dataset : CDataset Binary (2-classes) training set. Must be a :class:`.CDataset` instance with patterns data and corresponding labels. Returns ------- trained_cls : classifier Instance of the used solver trained using input dataset. """ # TODO: remove this check from here. # It should be handled in CClassifierLinear if dataset.num_classes != 2: raise ValueError("training can be performed on binary " "(2-classes) datasets only.") # TODO: remove this object init from here. Put into constructor # See RandomForest / DT and inherit from CClassifierSKlearn # Setting up classifier parameters sgd = linear_model.SGDClassifier( loss=self.loss.class_type, penalty=self.regularizer.class_type, alpha=self.alpha, fit_intercept=self.fit_intercept, max_iter=self.max_iter, tol=self.tol, shuffle=self.shuffle, learning_rate=self.learning_rate, eta0=self.eta0, power_t=self.power_t, class_weight=self.class_weight, average=self.average, warm_start=self.warm_start, random_state=self.random_state) # Pass loss function parameters to classifier sgd.set_params(**self.loss.get_params()) # Pass regularizer function parameters to classifier sgd.set_params(**self.regularizer.get_params()) # TODO: remove unconventional kernel usage. This is a linear classifier # Storing training dataset (will be used by decision function) self._tr = dataset.X if not self.is_kernel_linear() else None # Storing the training matrix for kernel mapping if self.is_kernel_linear(): # Training SGD classifier sgd.fit(dataset.X.get_data(), dataset.Y.tondarray()) else: # Training SGD classifier with kernel mapping sgd.fit(CArray( self.kernel.k(dataset.X)).get_data(), dataset.Y.tondarray()) # Temporary storing attributes of trained classifier self._w = CArray(sgd.coef_, tosparse=dataset.issparse).ravel() if self.fit_intercept is True: self._b = CArray(sgd.intercept_)[0] else: self._b = None return sgd # TODO: this function can be removed when removing kernel support def _forward(self, x): """Computes the distance from the separating hyperplane for each pattern in x. The scores are computed in kernel space if kernel is defined. Parameters ---------- x : CArray Array with new patterns to classify, 2-Dimensional of shape (n_patterns, n_features). Returns ------- score : CArray Value of the decision function for each test pattern. Dense flat array of shape (n_samples,) if `y` is not None, otherwise a (n_samples, n_classes) array. """ # Compute decision function in kernel space if necessary k = x if self.is_kernel_linear() else \ CArray(self.kernel.k(x, self._tr)) # Scores are given by the linear model return CClassifierLinear._forward(self, k) def _backward(self, w=None): """Computes the gradient of the linear classifier's decision function wrt decision function input. For linear classifiers, the gradient wrt the input x is equal to the weight vector w, regardless of x. Parameters ---------- w : CArray or None if CArray, it is pre-multiplied to the gradient of the module, as in standard reverse-mode autodiff. Returns ------- gradient : CArray The gradient of the linear classifier's decision function wrt decision function input. Vector-like array. """ if self.is_kernel_linear(): # Simply return w for a linear Ridge gradient = self.w.ravel() else: self.kernel.reference_samples = self._tr gradient = self.kernel.gradient(self._cached_x).atleast_2d() # Few shape check to ensure broadcasting works correctly if gradient.shape != (self._tr.shape[0], self.n_features): raise ValueError("Gradient shape must be ({:}, {:})".format( self._cached_x.shape[0], self.n_features)) w_2d = self.w.atleast_2d() if gradient.issparse is True: # To ensure the sparse dot is used w_2d = w_2d.tosparse() if w_2d.shape != (1, self._tr.shape[0]): raise ValueError( "Weight vector shape must be ({:}, {:}) " "or ravel equivalent".format(1, self._tr.shape[0])) gradient = w_2d.dot(gradient) # Gradient sign depends on input label (0/1) if w is not None: return w[0] * -gradient + w[1] * gradient else: raise ValueError("w cannot be set as None.")