#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Run supervised linkers."""
__author__ = 'Marco Fossati'
__email__ = 'fossati@spaziodati.eu'
__version__ = '1.0'
__license__ = 'GPL-3.0'
__copyright__ = 'Copyleft 2018, Hjfocs'
import logging
import os
import sys
from re import search
from typing import Iterator, Tuple
import click
import joblib
import pandas as pd
import recordlinkage as rl
from keras import backend as K
from numpy import full, nan
from soweego.commons import constants, keys, target_database
from soweego.ingester import wikidata_bot
from soweego.linker import blocking, classifiers, workflow
LOGGER = logging.getLogger(__name__)
@click.command()
@click.argument('classifier', type=click.Choice(constants.CLASSIFIERS))
@click.argument('catalog', type=click.Choice(target_database.supported_targets()))
@click.argument('entity', type=click.Choice(target_database.supported_entities()))
@click.option(
'-t',
'--threshold',
default=constants.CONFIDENCE_THRESHOLD,
help=f"Probability score threshold, default: {constants.CONFIDENCE_THRESHOLD}.",
)
@click.option(
'-n',
'--name-rule',
is_flag=True,
help='Activate post-classification rule on full names: links with different full names will be filtered.',
)
@click.option('-u', '--upload', is_flag=True, help='Upload links to Wikidata.')
@click.option(
'-s',
'--sandbox',
is_flag=True,
help='Perform all edits on the Wikidata sandbox item Q4115189.',
)
@click.option(
'-d',
'--dir-io',
type=click.Path(file_okay=False),
default=constants.WORK_DIR,
help=f'Input/output directory, default: {constants.WORK_DIR}.',
)
def cli(classifier, catalog, entity, threshold, name_rule, upload, sandbox, dir_io):
"""Run a supervised linker.
Build the classification set relevant to the given catalog and entity,
then generate links between Wikidata items and catalog identifiers.
Output a gzipped CSV file, format: QID,catalog_ID,confidence_score
You can pass the '-u' flag to upload the output to Wikidata.
A trained model must exist for the given classifier, catalog, entity.
To do so, use:
$ python -m soweego linker train
"""
actual_classifier = constants.CLASSIFIERS[classifier]
model_path, result_path = _handle_io(actual_classifier, catalog, entity, dir_io)
# Exit if the model file doesn't exist
if model_path is None:
sys.exit(1)
rl.set_option(*constants.CLASSIFICATION_RETURN_SERIES)
for i, chunk in enumerate(
execute(model_path, catalog, entity, threshold, name_rule, dir_io)
):
chunk.to_csv(result_path, mode='a', header=False)
if upload:
_upload(chunk, i, catalog, entity, sandbox)
# Free memory in case of neural networks:
# can be done only after classification
if actual_classifier in (
keys.SINGLE_LAYER_PERCEPTRON,
keys.MULTI_LAYER_PERCEPTRON,
keys.VOTING_CLASSIFIER,
keys.GATED_CLASSIFIER,
keys.STACKED_CLASSIFIER,
):
K.clear_session() # Clear the TensorFlow graph
LOGGER.info('Linking completed')
[docs]def execute(
model_path: str,
catalog: str,
entity: str,
threshold: float,
name_rule: bool,
dir_io: str,
) -> Iterator[pd.Series]:
"""Run a supervised linker.
1. Build the classification set relevant to the given catalog and entity
2. generate links between Wikidata items and catalog identifiers
:param model_path: path to a trained model file
:param catalog: ``{'discogs', 'imdb', 'musicbrainz'}``.
A supported catalog
:param entity: ``{'actor', 'band', 'director', 'musician', 'producer',
'writer', 'audiovisual_work', 'musical_work'}``.
A supported entity
:param threshold: minimum confidence score for generated links.
Those below this value are discarded.
Must be a float between 0 and 1
:param name_rule: whether to enable the rule on full names or not:
if *True*, links with different full names
are discarded after classification
:param dir_io: input/output directory where working files
will be read/written
:return: the generator yielding chunks of links
"""
classifier = joblib.load(model_path)
for (
wd_chunk,
target_chunk,
feature_vectors,
) in _classification_set_generator(catalog, entity, dir_io):
# The classification set must have the same feature space
# as the training one
_add_missing_feature_columns(classifier, feature_vectors)
predictions = (
# LSVM doesn't support probability scores
classifier.predict(feature_vectors)
if isinstance(classifier, rl.SVMClassifier)
else classifier.prob(feature_vectors)
)
predictions = _apply_linking_rules(
name_rule, predictions, target_chunk, wd_chunk
)
yield _get_unique_predictions_above_threshold(predictions, threshold)
def _classification_set_generator(
catalog, entity, dir_io
) -> Iterator[Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]]:
goal = 'classification'
# Wikidata side
wd_reader = workflow.build_wikidata(goal, catalog, entity, dir_io)
wd_generator = workflow.preprocess_wikidata(goal, wd_reader)
for i, wd_chunk in enumerate(wd_generator, 1):
# Collect samples via queries to the target DB
samples = blocking.find_samples(
goal,
catalog,
wd_chunk[keys.NAME_TOKENS],
i,
target_database.get_main_entity(catalog, entity),
dir_io,
)
# Build target chunk from samples
target_reader = workflow.build_target(
goal, catalog, entity, set(samples.get_level_values(keys.TID))
)
# Preprocess target chunk
target_chunk = workflow.preprocess_target(goal, target_reader)
# Extract features
features_path = os.path.join(
dir_io, constants.FEATURES.format(catalog, entity, goal, i)
)
feature_vectors = workflow.extract_features(
samples, wd_chunk, target_chunk, features_path
)
yield wd_chunk, target_chunk, feature_vectors
LOGGER.info('Chunk %d classified', i)
def _apply_linking_rules(name_rule, predictions, target_chunk, wd_chunk):
# Full name rule: if names differ, it's not a link
if name_rule:
LOGGER.info('Applying full names rule ...')
predictions = pd.DataFrame(predictions).apply(
_zero_when_different_names, axis=1, args=(wd_chunk, target_chunk)
)
# Wikidata URL rule: if the target ID has a Wikidata URL, it's a link
if target_chunk.get(keys.URL) is not None:
predictions = pd.DataFrame(predictions).apply(
_one_when_wikidata_link_correct, axis=1, args=(target_chunk,)
)
return predictions
def _get_unique_predictions_above_threshold(predictions, threshold) -> pd.DataFrame:
# Filter by threshold
above_threshold = predictions[predictions >= threshold]
# Remove duplicates
return above_threshold[~above_threshold.index.duplicated()]
def _handle_io(classifier, catalog, entity, dir_io):
# Build the output paths upon catalog, entity, and classifier args
model_path = os.path.join(
dir_io, constants.LINKER_MODEL.format(catalog, entity, classifier)
)
result_path = os.path.join(
dir_io, constants.LINKER_RESULT.format(catalog, entity, classifier)
)
if not os.path.isfile(model_path):
LOGGER.critical(
"Trained model not found at '%s'. "
"Please run 'python -m soweego linker train %s %s %s'",
model_path,
classifier,
catalog,
entity,
)
return None, None
# Delete existing result file,
# otherwise the current output would be appended to it
if os.path.isfile(result_path):
LOGGER.warning("Will delete old output file found at '%s' ...", result_path)
os.remove(result_path)
os.makedirs(os.path.dirname(result_path), exist_ok=True)
return model_path, result_path
def _upload(chunk, chunk_number, catalog, entity, sandbox):
links = dict(chunk.to_dict().keys())
LOGGER.info('Starting upload of links to Wikidata, chunk %d ...', chunk_number)
wikidata_bot.add_identifiers(links, catalog, entity, sandbox)
LOGGER.info('Upload to Wikidata completed, chunk %d', chunk_number)
def _add_missing_feature_columns(classifier, feature_vectors: pd.DataFrame):
# Handle amount of features depending on the classifier
expected_features: int
if isinstance(classifier, rl.NaiveBayesClassifier):
# This seems to be the only easy way for Naïve Bayes
expected_features = len(classifier.kernel._binarizers)
elif isinstance(classifier, rl.LogisticRegressionClassifier):
expected_features = classifier.kernel.coef_.shape[1]
elif isinstance(classifier, classifiers.SVCClassifier):
expected_features = classifier.kernel.shape_fit_[1]
elif isinstance(classifier, rl.SVMClassifier):
expected_features = classifier.kernel.coef_.shape[1]
elif isinstance(classifier, classifiers.RandomForest):
expected_features = classifier.kernel.n_features_
elif isinstance(
classifier,
(
classifiers.SingleLayerPerceptron,
classifiers.MultiLayerPerceptron,
classifiers.VotingClassifier,
classifiers.GatedEnsembleClassifier,
classifiers.StackedEnsembleClassifier,
),
):
expected_features = classifier.num_features
else:
err_msg = (
f'Unsupported classifier: {classifier.__class__.__name__}. '
f'It should be one of {set(constants.CLASSIFIERS)}'
)
LOGGER.critical(err_msg)
raise ValueError(err_msg)
actual_features = feature_vectors.shape[1]
if expected_features != actual_features:
LOGGER.info(
'Feature vectors have %d features, but %s expected %d. '
'Will add missing ones',
actual_features,
classifier.__class__.__name__,
expected_features,
)
for i in range(expected_features - actual_features):
feature_vectors[f'missing_{i}'] = full(
len(feature_vectors), constants.FEATURE_MISSING_VALUE
)
# See https://stackoverflow.com/a/18317089/10719765
# for `pandas` implementation details
def _zero_when_different_names(prediction, wikidata, target):
wd_names, target_names = set(), set()
qid, tid = prediction.name
for column in constants.NAME_FIELDS:
if wikidata.get(column) is not None:
values = wikidata.loc[qid][column]
if values is not nan:
wd_names.update(set(values))
if target.get(column) is not None:
values = target.loc[tid][column]
if values is not nan:
target_names.update(set(values))
return 0.0 if wd_names.isdisjoint(target_names) else prediction[0]
def _one_when_wikidata_link_correct(prediction, target):
qid, tid = prediction.name
urls = target.loc[tid][keys.URL]
if urls:
for url in urls:
if url and 'wikidata' in url:
has_qid = search(constants.QID_REGEX, url)
if has_qid:
new_score = 1.0 if qid == has_qid.group() else 0.0
LOGGER.debug(
'Wikidata URL detected in %s target ID: %s '
'Will update the confidence score from %f to %f',
(qid, tid),
url,
prediction[0],
new_score,
)
return new_score
return prediction[0]