#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""A set of custom supervised classifiers suitable for the
`Record Linkage Toolkit <https://recordlinkage.readthedocs.io/>`_.
It includes
`neural networks <https://en.wikipedia.org/wiki/Artificial_neural_network>`_ and
`support-vector machines <https://en.wikipedia.org/wiki/Support-vector_machine>`_.
All classes implement :class:`recordlinkage.base.BaseClassifier`: typically,
you will use its :meth:`fit() <recordlinkage.NaiveBayesClassifier.fit>`,
:meth:`predict() <recordlinkage.NaiveBayesClassifier.predict>`, and
:meth:`prob() <recordlinkage.NaiveBayesClassifier.prob>` methods.
"""
__author__ = 'Marco Fossati, Andrea Tupini'
__email__ = 'fossati@spaziodati.eu, tupini07@gmail.com'
__version__ = '1.0'
__license__ = 'GPL-3.0'
__copyright__ = 'Copyleft 2019, Hjfocs, tupini07'
import logging
import os
from contextlib import redirect_stderr
import numpy as np
import pandas as pd
from mlens.ensemble import SuperLearner
from recordlinkage.adapters import KerasAdapter, SKLearnAdapter
from recordlinkage.base import BaseClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import VotingClassifier as SKVotingClassifier
from sklearn.svm import SVC
from soweego.commons import constants, utils
with redirect_stderr(open(os.devnull, 'w')):
# When `keras` is imported, it prints a message to stderr
# saying which backend it's using. To avoid this, we
# redirect stderr to `devnull` for the statements in this block.
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.layers import BatchNormalization, Dense
from keras.models import Sequential
from keras.wrappers.scikit_learn import KerasClassifier
LOGGER = logging.getLogger(__name__)
# Small wrapper around 'KerasClassifier'. Its only use is to overwrite
# the predict method so that the returned output is (n_samples) instead of
# (n_samples, n_features)
class _KerasClassifierWrapper(KerasClassifier):
def predict(self, x, **kwargs):
return super(_KerasClassifierWrapper, self).predict(x, **kwargs)[:, 0]
def _get_proba_sklearn_base_classifier(
clf: BaseClassifier, features: pd.DataFrame
) -> pd.Series:
"""Returns the probabilities of a positive match by applying the
classifier to the provided feature vectors"""
match_class = clf.kernel.classes_[1]
# Invalid class label
assert match_class == 1, (
f'Invalid match class label: {match_class}.'
'clf.predict_proba() expects the second class '
'in the trained model to be 1'
)
# in the result, rows are classifications and columns are classes.
# We are in a binary setting, so 2 classes:
# `0` for non-matches, `1` for matches.
# We only need the probability of being a match,
# so we return the second column
classifications = clf.kernel.predict_proba(features)[:, 1]
return pd.Series(classifications, index=features.index)
# Base class that implements the training method
# `recordlinkage.adapters.KerasAdapter_fit`,
# shared across neural network implementations.
class _BaseNeuralNetwork(KerasAdapter, BaseClassifier):
def _fit(
self,
feature_vectors: pd.Series,
answers: pd.Series = None,
batch_size: int = None,
epochs: int = None,
validation_split: float = constants.VALIDATION_SPLIT,
) -> None:
# if batch size or epochs have not been provided as arguments, and
# the current instance has them as attributes, then use those. If not
# then use the defaults defined in constants
if batch_size is None:
batch_size = self.batch_size
if epochs is None:
epochs = self.epochs
model_path = os.path.join(
constants.WORK_DIR,
constants.NEURAL_NETWORK_CHECKPOINT_MODEL.format(self.__class__.__name__),
)
os.makedirs(os.path.dirname(model_path), exist_ok=True)
history = self.kernel.fit(
x=feature_vectors,
y=answers,
validation_split=validation_split,
batch_size=batch_size,
epochs=epochs,
verbose=1,
callbacks=[
EarlyStopping(
monitor='val_loss',
patience=100,
verbose=2,
restore_best_weights=True,
),
ModelCheckpoint(model_path, save_best_only=True),
],
)
LOGGER.info('Fit parameters: %s', history.params)
def _create_model(self, **kwargs):
raise NotImplementedError(
'Subclasses need to implement the "create_model" method.'
)
def _predict(self, values):
return self.kernel.predict(values)
def __repr__(self):
return (
f'{self.__class__.__name__}('
f'optimizer={self.optimizer.__class__.__name__}, '
f'loss={self.loss}, '
f'metrics={self.metrics})'
)
class _MLensAdapter(SKLearnAdapter, BaseClassifier):
"""
Wrapper around :class:`recordlinkage.SKLearnAdapter`, and
:class:`BaseClassifier` to be used as parent class for any classifier
which uses as kernel a subclass of :class:`mlens.ensemble.base.BaseEnsemble`.
This *adapter* correctly implements the *prob* and *_predict* methods so
the classifier can be properly used with the *recordlinkage* framework.
"""
def __init__(self, **kwargs):
super(_MLensAdapter, self).__init__()
def _check_correct_pred_shape(self, preds: np.ndarray):
"""
Sanity check to see that the *meta* model in the ensemble
actually gave as an output two possible classes.
"""
n_classes = preds.shape[1]
if n_classes != 2:
err_msg = (
"We're doing binary classification and we expect "
f"probabilities for only two classes, however "
f"we received '{n_classes}' classes."
)
LOGGER.critical(err_msg)
raise AssertionError(err_msg)
def prob(self, feature_vectors: pd.DataFrame) -> pd.Series:
"""Classify record pairs and include the probability score
of being a match.
:param feature_vectors: a :class:`DataFrame <pandas.DataFrame>`
computed via record pairs comparison. This should be
:meth:`recordlinkage.Compare.compute` output.
See :func:`extract_features() <soweego.linker.workflow.extract_features>`
for more details
:return: the classification results
"""
# mlens `predict` method returns probabilities
classifications = self.kernel.predict(feature_vectors)
self._check_correct_pred_shape(classifications)
# we're only interested in the probability for the positive
# case
classifications = classifications[:, 1]
return pd.Series(classifications, index=feature_vectors.index)
def _predict(self, features) -> np.ndarray:
prediction = super(_MLensAdapter, self)._predict(features)
self._check_correct_pred_shape(prediction)
prediction = prediction[:, 1]
# mlens `predict` method returns probabilities. Since we're
# dealing with a binary classification problem we just get the
# probabilities for the positive case and round them to [0,1].
prediction = np.array(list(round(x) for x in prediction))
return prediction
[docs]class SVCClassifier(SKLearnAdapter, BaseClassifier):
"""A support-vector machine classifier.
This class implements :class:`sklearn.svm.SVC`,
which is based on the `libsvm <https://www.csie.ntu.edu.tw/~cjlin/libsvm/>`_
library.
This classifier differs from
:class:`recordlinkage.classifiers.SVMClassifier`, which implements
:class:`sklearn.svm.LinearSVC`, based on the
`liblinear <https://www.csie.ntu.edu.tw/~cjlin/liblinear/>`_ library.
Main highlights:
- output probability scores
- can use non-linear kernels
- higher training time (quadratic to the number of samples)
"""
def __init__(self, *args, **kwargs):
super(SVCClassifier, self).__init__()
kwargs['probability'] = kwargs.get('probability', True)
self.kernel = SVC(*args, **kwargs)
[docs] def prob(self, feature_vectors: pd.DataFrame) -> pd.Series:
"""Classify record pairs and include the probability score
of being a match.
:param feature_vectors: a :class:`DataFrame <pandas.DataFrame>`
computed via record pairs comparison. This should be
:meth:`recordlinkage.Compare.compute` output.
See :func:`extract_features() <soweego.linker.workflow.extract_features>`
for more details
:return: the classification results
"""
return _get_proba_sklearn_base_classifier(self, feature_vectors)
def __repr__(self):
return f'{self.kernel}'
[docs]class RandomForest(SKLearnAdapter, BaseClassifier):
"""A Random Forest classifier.
This class implements :class:`sklearn.ensemble.RandomForestClassifier`, and receives
the same parameters.
It fits multiple decision trees on sub-samples (aka, parts) of the dataset and
averages the result to get more accuracy and reduce over-fitting.
The default parameters are:
- **n_estimators**: 500
- **criterion**: entropy
- **max_features**: None
- **bootstrap**: True
"""
def __init__(self, *args, **kwargs):
super(RandomForest, self).__init__()
kwargs = {**constants.RANDOM_FOREST_PARAMS, **kwargs}
self.kernel = RandomForestClassifier(*args, **kwargs)
[docs] def prob(self, feature_vectors: pd.DataFrame) -> pd.Series:
"""Classify record pairs and include the probability score
of being a match.
:param feature_vectors: a :class:`DataFrame <pandas.DataFrame>`
computed via record pairs comparison. This should be
:meth:`recordlinkage.Compare.compute` output.
See :func:`extract_features() <soweego.linker.workflow.extract_features>`
for more details
:return: the classification results
"""
return _get_proba_sklearn_base_classifier(self, feature_vectors)
def __repr__(self):
return f'{self.kernel}'
[docs]class SingleLayerPerceptron(_BaseNeuralNetwork):
"""A single-layer perceptron classifier.
This class implements a
`keras.Sequential <https://keras.io/models/sequential/>`_ model
with the following default architecture:
- single `Dense <https://keras.io/layers/core/>`_ layer
- ``sigmoid`` activation function
- ``adam`` optimizer
- ``binary_crossentropy`` loss function
- ``accuracy`` metric for evaluation
If you want to override the default parameters, you can pass the following
keyword arguments to the constructor:
- **activation** - see
`available activations <https://keras.io/activations/>`_
- **optimizer** - see
`optimizers <https://keras.io/optimizers/>`_
- **loss** - see
`available loss functions <https://keras.io/losses/>`_
- **metrics** - see
`available metrics <https://keras.io/metrics/>`_
"""
def __init__(self, num_features, **kwargs):
super(SingleLayerPerceptron, self).__init__()
kwargs = {**constants.SINGLE_LAYER_PERCEPTRON_PARAMS, **kwargs}
self.num_features = num_features
self.loss = kwargs.get('loss', constants.LOSS)
self.metrics = kwargs.get('metrics', constants.METRICS)
self.epochs = kwargs.get('epochs')
self.batch_size = kwargs.get('batch_size')
self.activation = kwargs.get('activation')
self.optimizer = kwargs.get('optimizer')
model = _KerasClassifierWrapper(
self._create_model,
activation=self.activation,
optimizer=self.optimizer,
)
self.kernel = model
def _create_model(self, activation=None, optimizer=None):
if optimizer is None:
optimizer = self.optimizer
if activation is None:
activation = self.activation
model = Sequential()
model.add(Dense(1, input_dim=self.num_features, activation=activation))
model.compile(optimizer=optimizer, loss=self.loss, metrics=self.metrics)
return model
[docs]class MultiLayerPerceptron(_BaseNeuralNetwork):
"""A multi-layer perceptron classifier.
This class implements a
`keras.Sequential <https://keras.io/models/sequential/>`_ model
with the following default architecture:
- `Dense <https://keras.io/layers/core/>`_ layer 1, with
``128`` output dimension and ``relu`` activation function
- `BatchNormalization <https://keras.io/layers/normalization/>`_ layer
- `Dense <https://keras.io/layers/core/>`_ layer 2, with
``32`` output dimension and ``relu`` activation function
- `BatchNormalization <https://keras.io/layers/normalization/>`_ layer
- `Dense <https://keras.io/layers/core/>`_ layer 3, with
``1`` output dimension and ``sigmoid`` activation function
- ``adadelta`` optimizer
- ``binary_crossentropy`` loss function
- ``accuracy`` metric for evaluation
If you want to override the default parameters, you can pass the following
keyword arguments to the constructor:
- **activations** - a triple with values for
*(dense layer 1, dense layer 2, dense layer 3)*.
See `available activations <https://keras.io/activations/>`_
- **optimizer** - see
`optimizers <https://keras.io/optimizers/>`_
- **loss** - see
`available loss functions <https://keras.io/losses/>`_
- **metrics** - see
`available metrics <https://keras.io/metrics/>`_
"""
def __init__(self, num_features, **kwargs):
super(MultiLayerPerceptron, self).__init__()
kwargs = {**constants.MULTI_LAYER_PERCEPTRON_PARAMS, **kwargs}
self.num_features = num_features
self.loss = kwargs.get('loss', constants.LOSS)
self.metrics = kwargs.get('metrics', constants.METRICS)
self.epochs = kwargs.get('epochs')
self.batch_size = kwargs.get('batch_size')
self.optimizer = kwargs.get('optimizer')
self.hidden_activation = kwargs.get('hidden_activation')
self.output_activation = kwargs.get('output_activation')
self.hidden_layer_dims = kwargs.get('hidden_layer_dims')
model = _KerasClassifierWrapper(
self._create_model,
optimizer=self.optimizer,
hidden_activation=self.hidden_activation,
output_activation=self.output_activation,
hidden_layer_dims=self.hidden_layer_dims,
)
self.kernel = model
def _create_model(
self,
optimizer=None,
hidden_activation=None,
output_activation=None,
hidden_layer_dims=None,
):
if optimizer is None:
optimizer = self.optimizer
if hidden_activation is None:
hidden_activation = self.hidden_activation
if output_activation is None:
output_activation = self.output_activation
if hidden_layer_dims is None:
hidden_layer_dims = self.hidden_layer_dims
model = Sequential()
for i, dim in enumerate(hidden_layer_dims):
if i == 0: # is first layer
model.add(
Dense(
dim,
input_dim=self.num_features,
activation=hidden_activation,
)
)
else:
model.add(Dense(dim, activation=hidden_activation))
model.add(BatchNormalization())
model.add(Dense(1, activation=output_activation))
model.compile(optimizer=optimizer, loss=self.loss, metrics=self.metrics)
return model
[docs]class VotingClassifier(SKLearnAdapter, BaseClassifier):
"""A basic ensemble classifier which uses a voting procedure to decide the final
outcome of a prediction.
This class implements :class:`sklearn.ensemble.VotingClassifier`.
It combines a set of classifiers and uses majority vote or
average predicted probabilities to pick the final prediction.
See scikit's
`user guide <https://scikit-learn.org/stable/modules/ensemble.html#voting-classifier>`_.
The parameter `voting` can have as values either **"hard"** or **"soft"**.
- **hard** - the label predicted by the majority of base classifiers is used as the
final prediction. Note that this does not return probabilities, only the final
label.
- **soft** - the probability that a pair is a match is taken from all base classifiers
and then averaged. This average is what is returned by the classifier.
By default `voting=soft`.
"""
def __init__(self, num_features, **kwargs):
super(VotingClassifier, self).__init__()
kwargs = {**constants.VOTING_CLASSIFIER_PARAMS, **kwargs}
voting = kwargs.pop('voting')
self.num_features = num_features
estimators = []
for clf in constants.CLASSIFIERS_FOR_ENSEMBLE:
model = utils.init_model(clf, num_features=num_features, **kwargs)
estimators.append((clf, model.kernel))
# use as kernel the VotingClassifier coming from sklearn
self.kernel = SKVotingClassifier(
estimators=estimators, voting=voting, n_jobs=None
)
[docs] def prob(self, feature_vectors: pd.DataFrame) -> pd.Series:
"""Classify record pairs and include the probability score
of being a match.
:param feature_vectors: a :class:`DataFrame <pandas.DataFrame>`
computed via record pairs comparison. This should be
:meth:`recordlinkage.Compare.compute` output.
See :func:`extract_features() <soweego.linker.workflow.extract_features>`
for more details
:return: the classification results
"""
match_class = self.kernel.classes_[1]
# Invalid class label
assert match_class == 1, (
f'Invalid match class label: {match_class}.'
'sklearn.ensemble.VotingClassifier.predict_proba() expects the second class '
'in the trained model to be 1'
)
if self.kernel.voting == 'hard':
classifications = self.kernel.predict(feature_vectors)
else:
# get only the probability that pairs are a match
classifications = self.kernel.predict_proba(feature_vectors)[:, 1]
return pd.Series(classifications, index=feature_vectors.index)
def __repr__(self):
return f'{self.kernel}'
[docs]class GatedEnsembleClassifier(_MLensAdapter):
"""Ensemble of classifiers, whose predictions are joined by using
a further meta-learner, which decides the final output based on the
prediction of the base classifiers.
This classifier uses :class:`mlens.ensemble.SuperLearner`
to implement the *gating* functionality.
The parameters, and their default values, are:
- **meta_layer**: Name of the classifier to use as a *meta layer*. By
default this is `single_layer_perceptron`
- **folds**: The number of folds to use for cross validation when
generating the training set for the **meta_layer**. The default
value for this is `2`.
For a better explanation of this parameter, see:
*Polley, Eric C.
and van der Laan, Mark J., “Super Learner In Prediction” (May 2010).
U.C. Berkeley Division of Biostatistics Working Paper Series.
Working Paper 266*
`<https://biostats.bepress.com/ucbbiostat/paper266/>`_
"""
def __init__(self, num_features, **kwargs):
super(GatedEnsembleClassifier, self).__init__()
kwargs = {**constants.GATED_ENSEMBLE_PARAMS, **kwargs}
self.num_features = num_features
self.num_folds = kwargs.pop('folds', 2)
self.meta_layer = kwargs.pop('meta_layer')
estimators = []
for clf in constants.CLASSIFIERS_FOR_ENSEMBLE:
model = utils.init_model(clf, num_features=self.num_features, **kwargs)
estimators.append((clf, model.kernel))
self.kernel = SuperLearner(verbose=2, n_jobs=1, folds=self.num_folds)
# use as output the probability of a given class (not just
# the class itself)
self.kernel.add(estimators, proba=True)
self.kernel.add_meta(
utils.init_model(
self.meta_layer, len(estimators) * self.num_folds, **kwargs
).kernel,
proba=True,
)
def __repr__(self):
return (
f'{self.__class__.__name__}('
f'num_folds={self.num_folds}, '
f'meta_layer={self.meta_layer}) '
)
[docs]class StackedEnsembleClassifier(_MLensAdapter):
"""Ensemble of stacked classifiers, meaning that classifiers are arranged in layers
with the next layer getting as input the output of the last layer.
The predictions of the final layer are merged with a meta-learner (the same happens for
~:class:`soweego.linker.GatedEnsembleClassifier`), which decides the final
output based on the prediction of the base classifiers.
This classifier uses :class:`mlens.ensemble.SuperLearner`
to implement the *stacking* functionality.
The parameters, and their default values, are:
- **meta_layer**: Name of the classifier to use as a *meta layer*. By
default this is `single_layer_perceptron`
- **folds**: The number of folds to use for cross validation when
generating the training set for the **meta_layer**. The default
value for this is `2`.
For a better explanation of this parameter, see:
*Polley, Eric C.
and van der Laan, Mark J., “Super Learner In Prediction” (May 2010).
U.C. Berkeley Division of Biostatistics Working Paper Series.
Working Paper 266*
`<https://biostats.bepress.com/ucbbiostat/paper266/>`_
"""
def __init__(self, num_features, **kwargs):
super(StackedEnsembleClassifier, self).__init__()
kwargs = {**constants.STACKED_ENSEMBLE_PARAMS, **kwargs}
self.num_features = num_features
self.num_folds = kwargs.pop('folds', 2)
self.meta_layer = kwargs.pop('meta_layer')
def init_estimators(num_features):
estimators = []
for clf in constants.CLASSIFIERS_FOR_ENSEMBLE:
model = utils.init_model(clf, num_features=num_features, **kwargs)
estimators.append((clf, model.kernel))
return estimators
self.kernel = SuperLearner(verbose=2, n_jobs=1, folds=self.num_folds)
l1_estimators = init_estimators(self.num_features)
self.kernel.add(l1_estimators, proba=True)
l2_estimators = init_estimators(len(l1_estimators) * self.num_folds)
self.kernel.add(l2_estimators, proba=True)
self.kernel.add_meta(
utils.init_model(
self.meta_layer, len(l2_estimators) * self.num_folds, **kwargs
).kernel,
proba=True,
)
def __repr__(self):
return (
f'{self.__class__.__name__}('
f'num_folds={self.num_folds}, '
f'meta_layer={self.meta_layer}) '
)