Source code for soweego.linker.features

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""A set of custom features suitable for the
`Record Linkage Toolkit <https://recordlinkage.readthedocs.io/>`_,
where feature extraction stands for *record pairs comparison*.

**Input:** pairs of :class:`list` objects
coming from Wikidata and target catalog :class:`pandas.DataFrame` columns as per
:func:`preprocess_wikidata() <soweego.linker.workflow.preprocess_wikidata>` and
:func:`preprocess_target() <soweego.linker.workflow.preprocess_target>` output.

**Output:** a *feature vector* :class:`pandas.Series`.

All classes in this module share the following constructor parameters:

- **left_on** (str) - a Wikidata column label
- **right_on** (str) - a target catalog column label
- **missing_value** - (optional) a score to fill null values
- **label** - (optional) a label for the output feature
  :class:`Series <pandas.Series>`

Specific parameters are documented in the  *__init__* method of each class.

All classes in this module implement
:class:`recordlinkage.base.BaseCompareFeature`, and can be
added to the feature extractor object :class:`recordlinkage.Compare`.

Usage::

>>> import recordlinkage as rl
>>> from soweego.linker import features
>>> extractor = rl.Compare()
>>> source_column, target_column = 'birth_name', 'fullname'
>>> feature = features.ExactMatch(source_column, target_column)
>>> extractor.add(feature)

"""

__author__ = 'Marco Fossati'
__email__ = 'fossati@spaziodati.eu'
__version__ = '1.0'
__license__ = 'GPL-3.0'
__copyright__ = 'Copyleft 2018, Hjfocs'

import itertools
import logging
from multiprocessing import Manager
from typing import List, Set, Tuple

import jellyfish
import numpy as np
import pandas as pd
from recordlinkage.base import BaseCompareFeature
from recordlinkage.utils import fillna
from sklearn.feature_extraction.text import CountVectorizer

from soweego.commons import constants, text_utils
from soweego.wikidata import sparql_queries

LOGGER = logging.getLogger(__name__)

# When calling `SharedOccupations._expand_occupations`, it's useful to have
# a concurrent-safe dict where we cache the return value.
# In this way, all threads of a parallel feature extractor execution
# e.g., `recordlinkage.Compare(n_jobs=4)
# will have access to the dict, thus drastically decreasing
# the amount of needed SPARQL queries.
# This cache is also preserved across executions of the feature extractor.
_threading_manager = Manager()
_global_occupations_qid_cache = _threading_manager.dict()


# Adapted from https://github.com/J535D165/recordlinkage/blob/master/recordlinkage/compare.py
# See RECORDLINKAGE_LICENSE
[docs]class ExactMatch(BaseCompareFeature): """Compare pairs of lists through exact match on each pair of elements.""" name = 'exact_match' description = 'Compare pairs of lists through exact match on each pair of elements.'
[docs] def __init__( self, left_on: str, right_on: str, match_value: float = 1.0, non_match_value: float = 0.0, missing_value: float = constants.FEATURE_MISSING_VALUE, label: str = None, ): """ :param left_on: a Wikidata :class:`DataFrame <pandas.DataFrame>` column label :param right_on: a target catalog :class:`DataFrame <pandas.DataFrame>` column label :param match_value: (optional) a score when element pairs match :param non_match_value: (optional) a score when element pairs do not match :param missing_value: (optional) a score to fill null values :param label: (optional) a label for the output feature :class:`Series <pandas.Series>` """ super(ExactMatch, self).__init__(left_on, right_on, label=label) self.match_value = match_value self.non_match_value = non_match_value self.missing_value = missing_value
def _compute_vectorized(self, source_column, target_column): concatenated = pd.Series(list(zip(source_column, target_column))) def exact_apply(pair): if _pair_has_any_null(pair): LOGGER.debug("Can't compare, the pair contains null values: %s", pair) return np.nan scores = [] for source in pair[0]: for target in pair[1]: if pd.isna(source) or pd.isna(target): scores.append(self.missing_value) continue if source == target: scores.append(self.match_value) else: scores.append(self.non_match_value) return max(scores) return fillna(concatenated.apply(exact_apply), self.missing_value)
[docs]class SimilarStrings(BaseCompareFeature): """Compare pairs of lists holding **strings** through similarity measures on each pair of elements. """ name = 'similar_strings' description = ( 'Compare pairs of lists holding strings ' 'through similarity measures on each pair of elements' )
[docs] def __init__( self, left_on: str, right_on: str, algorithm: str = 'levenshtein', threshold: float = None, missing_value: float = constants.FEATURE_MISSING_VALUE, analyzer: str = None, ngram_range: Tuple[int, int] = (2, 2), label: str = None, ): """ :param left_on: a Wikidata :class:`DataFrame <pandas.DataFrame>` column label :param right_on: a target catalog :class:`DataFrame <pandas.DataFrame>` column label :param algorithm: (optional) ``{'cosine', 'levenshtein'}``. A string similarity algorithm, either the `cosine similarity <https://en.wikipedia.org/wiki/Cosine_similarity>`_ or the `Levenshtein distance <https://en.wikipedia.org/wiki/Levenshtein_distance>`_ respectively :param threshold: (optional) a threshold to filter features with a lower or equal score :param missing_value: (optional) a score to fill null values :param analyzer: (optional, only applies when *algorithm='cosine'*) ``{'soweego', 'word', 'char', 'char_wb'}``. A text analyzer to preprocess input. It is passed to the *analyzer* parameter of :class:`sklearn.feature_extraction.text.CountVectorizer`. - ``'soweego'`` is :func:`soweego.commons.text_utils.tokenize` - ``{'word', 'char', 'char_wb'}`` are *scikit* built-ins. See `here <https://scikit-learn.org/stable/modules/feature_extraction.html#limitations-of-the-bag-of-words-representation>`_ for more details - ``None`` is :meth:`str.split`, and means input is already preprocessed :param ngram_range: (optional, only applies when *algorithm='cosine'* and *analyzer* is not *'soweego')*. Lower and upper boundary for n-gram extraction, passed to :class:`CountVectorizer <sklearn.feature_extraction.text.CountVectorizer>` :param label: (optional) a label for the output feature :class:`Series <pandas.Series>` """ super(SimilarStrings, self).__init__(left_on, right_on, label=label) self.algorithm = algorithm self.threshold = threshold self.missing_value = missing_value self.analyzer = analyzer self.ngram_range = ngram_range
def _compute_vectorized(self, source_column, target_column): if self.algorithm == 'levenshtein': algorithm = self.levenshtein_similarity elif self.algorithm == 'cosine': algorithm = self.cosine_similarity else: err_msg = ( f'Bad string similarity algorithm: {self.algorithm}. ' f"Please use one of ('levenshtein', 'cosine')" ) LOGGER.critical(err_msg) raise ValueError(err_msg) compared = algorithm(source_column, target_column) compared_filled = fillna(compared, self.missing_value) if self.threshold is None: return compared_filled return (compared_filled >= self.threshold).astype(np.float64) # Adapted from # https://github.com/J535D165/recordlinkage/blob/master/recordlinkage/algorithms/string.py # Maximum edit distance among the list of values # TODO low scores if name is swapped with surname, # see https://github.com/Wikidata/soweego/issues/175 def levenshtein_similarity(self, source_column, target_column): paired = pd.Series(list(zip(source_column, target_column))) def levenshtein_apply(pair): if _pair_has_any_null(pair): LOGGER.debug( "Can't compute Levenshtein distance, " "the pair contains null values: %s", pair, ) return np.nan scores = [] source_list, target_list = pair for source in source_list: for target in target_list: try: score = 1 - jellyfish.levenshtein_distance( source, target ) / np.max([len(source), len(target)]) scores.append(score) except TypeError: if pd.isnull(source) or pd.isnull(target): scores.append(self.missing_value) else: raise return max(scores) return paired.apply(levenshtein_apply) def cosine_similarity(self, source_column, target_column): if len(source_column) != len(target_column): raise ValueError('Columns must have the same length') if len(source_column) == len(target_column) == 0: LOGGER.warning("Can't compute cosine similarity, columns are empty") return pd.Series(np.nan) # This algorithm requires strings as input, but lists are expected source_column, target_column = ( source_column.str.join(' '), target_column.str.join(' '), ) # No analyzer means input underwent `commons.text_utils.tokenize` if self.analyzer is None: vectorizer = CountVectorizer(analyzer=str.split) elif self.analyzer == 'soweego': vectorizer = CountVectorizer(analyzer=text_utils.tokenize) # scikit-learn built-ins # `char` and `char_wb` make CHARACTER n-grams, instead of WORD ones: # they may be useful for short strings with misspellings. # `char_wb` makes n-grams INSIDE words, # thus eventually padding with whitespaces. See # https://scikit-learn.org/stable/modules/feature_extraction.html#limitations-of-the-bag-of-words-representation elif self.analyzer in ('word', 'char', 'char_wb'): vectorizer = CountVectorizer( analyzer=self.analyzer, strip_accents='unicode', ngram_range=self.ngram_range, ) else: err_msg = ( f'Bad text analyzer: {self.analyzer}. ' f"Please use one of ('soweego', 'word', 'char', 'char_wb')" ) LOGGER.critical(err_msg) raise ValueError(err_msg) data = source_column.append(target_column).fillna('') try: vectors = vectorizer.fit_transform(data) except ValueError as ve: LOGGER.warning( 'Failed transforming text into vectors, reason: %s. Text: %s', ve, data, ) return pd.Series(np.nan) def _metric_sparse_cosine(u, v): a = np.sqrt(u.multiply(u).sum(axis=1)) b = np.sqrt(v.multiply(v).sum(axis=1)) ab = v.multiply(u).sum(axis=1) cosine = np.divide(ab, np.multiply(a, b)).A1 return cosine return _metric_sparse_cosine( vectors[: len(source_column)], vectors[len(source_column) :] )
[docs]class SimilarDates(BaseCompareFeature): """Compare pairs of lists holding **dates** through match by maximum shared precision. """ name = 'similar_dates' description = ( 'Compare pairs of lists holding dates ' 'through match by maximum shared precision' )
[docs] def __init__( self, left_on: str, right_on: str, missing_value: float = constants.FEATURE_MISSING_VALUE, label: str = None, ): """ :param left_on: a Wikidata :class:`DataFrame <pandas.DataFrame>` column label :param right_on: a target catalog :class:`DataFrame <pandas.DataFrame>` column label :param missing_value: (optional) a score to fill null values :param label: (optional) a label for the output feature :class:`Series <pandas.Series>` """ super(SimilarDates, self).__init__(left_on, right_on, label=label) self.missing_value = missing_value
def _compute_vectorized(self, source_column, target_column): concatenated = pd.Series(list(zip(source_column, target_column))) def check_date_equality(pair: Tuple[List[pd.Period], List[pd.Period]]): if _pair_has_any_null(pair): LOGGER.debug( "Can't compare dates, the pair contains null values: %s", pair, ) return np.nan source_list, target_list = pair # Keep track of the best score best = 0 for source, target in itertools.product(source_list, target_list): # Get precision number for both dates s_precision = constants.PD_PERIOD_PRECISIONS.index(source.freq.name) t_precision = constants.PD_PERIOD_PRECISIONS.index(target.freq.name) # Minimum pair precision = maximum shared precision lowest_prec = min(s_precision, t_precision) # Result for the current `source` current_result = 0 # Loop through `pandas.Period` attributes that we can compare # and the precision that stands for said attribute for min_required_prec, d_attr in enumerate( ['year', 'month', 'day', 'hour', 'minute', 'second'] ): # If both `source` and `target` have a precision which allows the # current attribute to be compared then we do so. If the attribute # matches then we add 1 to `current_result`, if not then we break the loop. # We consider from lowest to highest precision. If a lowest # precision attribute (e.g., year) doesn't match then we say that # the dates don't match at all (we don't check if higher precision # attributes match) if lowest_prec >= min_required_prec and getattr( source, d_attr ) == getattr(target, d_attr): current_result += 1 else: break # We want a value between 0 and 1 for our score. 0 means no match at all and # 1 stands for perfect match. We just divide `current_result` by `lowest_prec` # so that we get the percentage of items that matches from the total number # of items we compared (since we have variable date precision) # we sum 1 to `lowest_prec` to account for the fact that the possible minimum # common precision is 0 (the year) best = max(best, (current_result / (lowest_prec + 1))) return best return fillna(concatenated.apply(check_date_equality), self.missing_value)
[docs]class SharedTokens(BaseCompareFeature): """Compare pairs of lists holding **string tokens** through weighted intersection. """ name = 'shared_tokens' description = ( 'Compare pairs of lists holding string tokens ' 'through weighted intersection' )
[docs] def __init__( self, left_on: str, right_on: str, missing_value: float = constants.FEATURE_MISSING_VALUE, label: str = None, ): """ :param left_on: a Wikidata :class:`DataFrame <pandas.DataFrame>` column label :param right_on: a target catalog :class:`DataFrame <pandas.DataFrame>` column label :param missing_value: (optional) a score to fill null values :param label: (optional) a label for the output feature :class:`Series <pandas.Series>` """ super(SharedTokens, self).__init__(left_on, right_on, label=label) self.missing_value = missing_value
def _compute_vectorized(self, source_column, target_column): concatenated = pd.Series(list(zip(source_column, target_column))) def intersection_percentage_size(pair): if _pair_has_any_null(pair): LOGGER.debug( "Can't compare tokens, the pair contains null values: %s", pair, ) return np.nan source_list, target_list = pair source_set, target_set = set(source_list), set() for value in target_list: if value: target_set.update(filter(None, value.split())) intersection = source_set.intersection(target_set) count_intersect = len(intersection) count_total = len(source_set.union(target_set)) # Penalize band stopwords count_low_score_words = len( text_utils.BAND_NAME_LOW_SCORE_WORDS.intersection(intersection) ) return ( (count_intersect - (count_low_score_words * 0.9)) / count_total if count_total > 0 else np.nan ) return fillna( concatenated.apply(intersection_percentage_size), self.missing_value )
[docs]class SharedOccupations(BaseCompareFeature): """Compare pairs of lists holding **occupation QIDs** *(ontology classes)* through expansion of the class hierarchy, plus intersection of values. """ name = 'shared_occupations' description = ( 'Compare pairs of lists holding occupation QIDs ' 'through expansion of the class hierarchy, plus intersection of values' )
[docs] def __init__( self, left_on: str, right_on: str, missing_value: float = 0.0, label: str = None, ): """ :param left_on: a Wikidata :class:`DataFrame <pandas.DataFrame>` column label :param right_on: a target catalog :class:`DataFrame <pandas.DataFrame>` column label :param missing_value: (optional) a score to fill null values :param label: (optional) a label for the output feature :class:`Series <pandas.Series>` """ super(SharedOccupations, self).__init__(left_on, right_on, label=label) global _global_occupations_qid_cache self.missing_value = missing_value self._expand_occupations_cache = _global_occupations_qid_cache
# This should be applied to a `pandas.Series`, where each element # is a set of QIDs. # This function will expand the set to include all superclasses and # subclasses of each QID in the original set. def _expand_occupations(self, occupation_qids: Set[str]) -> Set[str]: expanded_set = set() for qid in occupation_qids: expanded_set.add(qid) # Check if we have the subclasses and superclasses # of this specific qid cached in memory if qid in self._expand_occupations_cache: # if we do then add them to expanded set expanded_set |= self._expand_occupations_cache[qid] # otherwise get them from # wikidata and add them to the cache and # `expanded_set` else: subclasses = sparql_queries.subclasses_of(qid) superclasses = sparql_queries.superclasses_of(qid) joined = subclasses | superclasses self._expand_occupations_cache[qid] = joined expanded_set |= joined return expanded_set def _compute_vectorized(self, source_column: pd.Series, target_column: pd.Series): # add the superclasses and subclasses of each occupation to # the target column target_column = target_column.apply(self._expand_occupations) concatenated = pd.Series(list(zip(source_column, target_column))) # Given 2 sets, return the percentage of items that the # smaller set shares with the larger set def check_occupation_equality(pair: Tuple[Set[str], Set[str]]): if _pair_has_any_null(pair): LOGGER.debug( "Can't compare occupations, " "the pair contains null values: %s", pair, ) return np.nan s_item, t_item = pair min_length = min(len(s_item), len(t_item)) n_shared_items = len(s_item & t_item) return n_shared_items / min_length return fillna(concatenated.apply(check_occupation_equality), self.missing_value)
[docs]class SharedTokensPlus(BaseCompareFeature): """Compare pairs of lists holding **string tokens** through weighted intersection. This feature is similar to :class:`SharedTokens`, but has extra functionality: - handles arbitrary stop words - accepts nested list of tokens - output score is the percentage of tokens in the smallest set which are shared among both sets """ name = 'shared_tokens_plus' description = ( 'Compare pairs of lists holding string tokens ' 'through weighted intersection' )
[docs] def __init__( self, left_on: str, right_on: str, missing_value: float = constants.FEATURE_MISSING_VALUE, label: str = None, stop_words: Set = None, ): """ :param left_on: a Wikidata :class:`DataFrame <pandas.DataFrame>` column label :param right_on: a target catalog :class:`DataFrame <pandas.DataFrame>` column label :param missing_value: (optional) a score to fill null values :param label: (optional) a label for the output feature :class:`Series <pandas.Series>` :param stop_words: (optional) a set of `stop words <https://en.wikipedia.org/wiki/Stop_words>`_ to be filtered from input pairs """ super(SharedTokensPlus, self).__init__(left_on, right_on, label=label) self.missing_value = missing_value self.stop_words = stop_words
@staticmethod def _flatten(list_to_flatten: List) -> List: to_process = [list_to_flatten] result = [] while len(to_process) != 0: current = to_process.pop() for child in current: if isinstance(child, List): to_process.append(child) else: result.append(child) return result def _compute_vectorized( self, source_column: pd.Series, target_column: pd.Series ) -> pd.Series: # concatenate columns for easier processing. Here each element in # the columns is a set of tokens concatenated = pd.Series(list(zip(source_column, target_column))) # Compute shared tokens after filtering stop words def compare_apply(pair: Tuple[List[str], List[str]]) -> float: if _pair_has_any_null(pair): LOGGER.debug("Can't compare, the pair contains null values: %s", pair) return np.nan # first we clean a bit the pair # make all lowercase and split on possible spaces # also reshape result into a list (flatten) pair = [self._flatten([el.lower().split() for el in p]) for p in pair] s_item, t_item = pair # finally convert to sets s_item = set(s_item) t_item = set(t_item) if self.stop_words: s_item -= self.stop_words t_item -= self.stop_words min_length = min(len(s_item), len(t_item)) n_shared_items = len(s_item & t_item) # Prevent division by 0 if min_length != 0: return n_shared_items / min_length else: return np.nan return fillna(concatenated.apply(compare_apply), self.missing_value)
def _pair_has_any_null(pair): if not all(pair): return True source_is_null, target_is_null = pd.isna(pair[0]), pd.isna(pair[1]) if isinstance(source_is_null, np.ndarray): source_is_null = source_is_null.all() if isinstance(target_is_null, np.ndarray): target_is_null = target_is_null.all() if source_is_null or target_is_null: return True return False