#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""`Record linkage <https://en.wikipedia.org/wiki/Record_linkage>`_ workflow.
It is a pipeline composed of the following main steps:
1. build the Wikidata (:func:`build_wikidata`)
and target (:func:`build_target`) datasets
2. preprocess both (:func:`preprocess_wikidata` and :func:`preprocess_target`)
3. extract features by comparing pairs of Wikidata and target values
(:func:`extract_features`)
"""
import datetime
import gzip
import json
import logging
import os
from multiprocessing import cpu_count
from typing import Iterator, Set
import pandas as pd
import recordlinkage as rl
from numpy import nan
from pandas import read_sql
from pandas.io.json._json import JsonReader
from sqlalchemy.orm import Query
from soweego.commons import (
constants, data_gathering, keys, target_database, text_utils, url_utils, utils
)
from soweego.commons.db_manager import DBManager
from soweego.commons.logging import log_dataframe_info
from soweego.linker import features
from soweego.wikidata import api_requests, vocabulary
__author__ = 'Marco Fossati'
__email__ = 'fossati@spaziodati.eu'
__version__ = '1.0'
__license__ = 'GPL-3.0'
__copyright__ = 'Copyleft 2018, Hjfocs'
LOGGER = logging.getLogger(__name__)
[docs]def build_wikidata(goal: str, catalog: str, entity: str, dir_io: str) -> JsonReader:
"""Build a Wikidata dataset for training or classification purposes:
workflow step 1.
Data is gathered from the
`SPARQL endpoint <https://query.wikidata.org/>`_ and the
`Web API <https://www.wikidata.org/w/api.php>`_.
**How it works:**
1. gather relevant Wikidata items that *hold* (for *training*)
or *lack* (for *classification*) identifiers of the given catalog
2. gather relevant item data
3. dump the dataset to a gzipped `JSON Lines <http://jsonlines.org/>`_ file
4. read the dataset into a generator of :class:`pandas.DataFrame` chunks
for memory-efficient processing
:param goal: ``{'training', 'classification'}``.
Whether to build a dataset for training or classification
:param catalog: ``{'discogs', 'imdb', 'musicbrainz'}``.
A supported catalog
:param entity: ``{'actor', 'band', 'director', 'musician', 'producer',
'writer', 'audiovisual_work', 'musical_work'}``.
A supported entity
:param dir_io: input/output directory where working files
will be read/written
:return: the generator yielding :class:`pandas.DataFrame` chunks
"""
qids_and_tids, wd_io_path = _handle_goal(goal, catalog, entity, dir_io)
catalog_pid = target_database.get_catalog_pid(catalog, entity)
if not os.path.isfile(wd_io_path):
LOGGER.info(
"Building Wikidata %s set for %s %s, output file '%s' ...",
goal,
catalog,
entity,
wd_io_path,
)
# Make working folders
os.makedirs(os.path.dirname(wd_io_path), exist_ok=True)
# 1. Gather Wikidata items
if goal == 'training':
# WITH target IDs
data_gathering.gather_target_ids(
entity, catalog, catalog_pid, qids_and_tids
)
qids = qids_and_tids.keys()
elif goal == 'classification':
# WITHOUT target IDs
qids = data_gathering.gather_qids(entity, catalog, catalog_pid)
# 2. Collect relevant data, and 3. dump to gzipped JSON Lines
url_pids, ext_id_pids_to_urls = data_gathering.gather_relevant_pids()
with gzip.open(wd_io_path, 'wt') as wd_io:
api_requests.get_data_for_linker(
catalog,
entity,
qids,
url_pids,
ext_id_pids_to_urls,
qids_and_tids,
wd_io,
)
# Cached dataset, for development purposes
else:
LOGGER.info("Will reuse existing Wikidata %s set: '%s'", goal, wd_io_path)
if goal == 'training':
_reconstruct_qids_and_tids(wd_io_path, qids_and_tids)
LOGGER.info('Wikidata %s set built', goal)
return pd.read_json(wd_io_path, lines=True, chunksize=1000)
[docs]def build_target(
goal: str, catalog: str, entity: str, identifiers: Set[str]
) -> Iterator[pd.DataFrame]:
"""Build a target catalog dataset for training or classification purposes:
workflow step 1.
Data is gathered by querying the ``s51434__mixnmatch_large_catalogs_p``
database. This is where the :mod:`importer` inserts processed catalog dumps.
The database is located in
`ToolsDB <https://wikitech.wikimedia.org/wiki/Help:Toolforge/Database#User_databases>`_
under the Wikimedia
`Toolforge <https://wikitech.wikimedia.org/wiki/Portal:Toolforge>`_ infrastructure.
See `how to connect <https://wikitech.wikimedia.org/wiki/Help:Toolforge/Database#Connecting_to_the_database_replicas>`_.
:param goal: ``{'training', 'classification'}``.
Whether to build a dataset for training or classification
:param catalog: ``{'discogs', 'imdb', 'musicbrainz'}``.
A supported catalog
:param entity: ``{'actor', 'band', 'director', 'musician', 'producer',
'writer', 'audiovisual_work', 'musical_work'}``.
A supported entity
:param identifiers: a set of catalog IDs to gather data for
:return: the generator yielding :class:`pandas.DataFrame` chunks
"""
utils.check_goal_value(goal)
LOGGER.info('Building target %s set for %s %s ...', goal, catalog, entity)
# Target catalog ORM entities/DB tables
base, link, nlp = (
target_database.get_main_entity(catalog, entity),
target_database.get_link_entity(catalog, entity),
target_database.get_nlp_entity(catalog, entity),
)
tables = [table for table in (base, link, nlp) if table]
# Initial query with all non-null tables
query = Query(tables)
# Remove `base` to avoid outer join with itself
tables.remove(base)
# Outer joins
for table in tables:
query = query.outerjoin(table, base.catalog_id == table.catalog_id)
# Condition
query = query.filter(base.catalog_id.in_(identifiers)).enable_eagerloads(False)
sql = query.statement
LOGGER.debug('SQL query to be fired: %s', sql)
# Avoid loading query result in memory
db_engine = DBManager().get_engine().execution_options(stream_results=True)
return read_sql(sql, db_engine, chunksize=1000)
[docs]def preprocess_wikidata(
goal: str, wikidata_reader: JsonReader
) -> Iterator[pd.DataFrame]:
"""Preprocess a Wikidata dataset: workflow step 2.
This function consumes :class:`pandas.DataFrame` chunks and
should be pipelined after :func:`build_wikidata`.
**Preprocessing actions:**
1. set QIDs as :class:`pandas.core.indexes.base.Index` of the chunk
2. drop columns with null values only
3. *(training)* ensure one target ID per QID
4. tokenize names, URLs, genres, when applicable
5. *(shared with* :func:`preprocess_target` *)*
normalize columns with names, occupations, dates, when applicable
:param goal: ``{'training', 'classification'}``.
Whether the dataset is for training or classification
:param wikidata_reader: a dataset reader as returned by
:func:`build_wikidata`
:return: the generator yielding preprocessed
:class:`pandas.DataFrame` chunks
"""
utils.check_goal_value(goal)
LOGGER.info('Preprocessing Wikidata %s set ...', goal)
for i, chunk in enumerate(wikidata_reader, 1):
# 1. QID as index
chunk.set_index(keys.QID, inplace=True)
log_dataframe_info(LOGGER, chunk, f"Built index from '{keys.QID}' column")
# 2. Drop columns with null values only
_drop_null_columns(chunk)
# 3. Training only: ensure 1 target ID
if goal == 'training':
# This wipes out QIDs with > 1 positive samples,
# but the impact can be neglected
chunk[keys.TID] = chunk[keys.TID].map(
lambda cell: cell[0] if isinstance(cell, list) else cell
)
# 4. Tokenize names
for column in constants.NAME_FIELDS:
if chunk.get(column) is not None:
chunk[f'{column}_tokens'] = chunk[column].apply(
_tokenize_values, args=(text_utils.tokenize,)
)
# 4b. Tokenize genres if available
if chunk.get(keys.GENRES) is not None:
chunk[keys.GENRES] = chunk[keys.GENRES].apply(
_tokenize_values, args=(text_utils.tokenize,)
)
# 5. Tokenize URLs
chunk[keys.URL_TOKENS] = chunk[keys.URL].apply(
_tokenize_values, args=(url_utils.tokenize,)
)
# 6. Shared preprocessing
chunk = _shared_preprocessing(
chunk,
_will_handle_birth_dates(chunk),
_will_handle_death_dates(chunk),
)
LOGGER.info('Chunk %d done', i)
yield chunk
[docs]def preprocess_target(goal: str, target_reader: Iterator[pd.DataFrame]) -> pd.DataFrame:
"""Preprocess a target catalog dataset: workflow step 2.
This function consumes :class:`pandas.DataFrame` chunks and
should be pipelined after :func:`build_target`.
**Preprocessing actions:**
1. drop unneeded columns holding target DB primary keys
2. rename non-null catalog ID columns & drop others
3. drop columns with null values only
4. pair dates with their precision and drop precision columns
when applicable
5. aggregate denormalized data on target ID
6. *(shared with* :func:`preprocess_wikidata` *)*
normalize columns with names, occupations, dates, when applicable
:param goal: ``{'training', 'classification'}``.
Whether the dataset is for training or classification
:param target_reader: a dataset reader as returned by
:func:`build_target`
:return: the generator yielding preprocessed
:class:`pandas.DataFrame` chunks
"""
utils.check_goal_value(goal)
LOGGER.info('Preprocessing target ...')
# Target data is denormalized, so we must consume the input generator
# to perform consistent aggregations later
target = pd.concat([chunk for chunk in target_reader], sort=False)
# 1. Drop target DB internal ID columns
LOGGER.info("Dropping '%s' columns ...", keys.INTERNAL_ID)
target.drop(columns=keys.INTERNAL_ID, inplace=True)
log_dataframe_info(LOGGER, target, f"Dropped '{keys.INTERNAL_ID}'' columns")
# 2. Rename non-null catalog ID column & drop others
_rename_or_drop_tid_columns(target)
# 3. Drop columns with null values only
LOGGER.info('Dropping columns with null values only ...')
_drop_null_columns(target)
# 4. Pair dates with their precision & drop precision columns
_pair_dates(target)
# 5. Aggregate denormalized data on target ID
# TODO Token lists may contain duplicate tokens
LOGGER.info("Aggregating denormalized data on '%s' column ...", keys.TID)
target = target.groupby(keys.TID).agg(lambda x: list(set(x)))
log_dataframe_info(
LOGGER, target, f"Data indexed and aggregated on '{keys.TID}' column"
)
# 6. Shared preprocessing
target = _shared_preprocessing(
target,
_will_handle_birth_dates(target),
_will_handle_death_dates(target),
)
LOGGER.info('Target preprocessing done')
return target
def _add_date_features(feature_extractor, in_both_datasets):
birth_column, death_column = keys.DATE_OF_BIRTH, keys.DATE_OF_DEATH
if in_both_datasets(birth_column):
feature_extractor.add(
features.SimilarDates(
birth_column, birth_column, label=f'{birth_column}_similar'
)
)
if in_both_datasets(death_column):
feature_extractor.add(
features.SimilarDates(
death_column, death_column, label=f'{death_column}_similar'
)
)
def _add_url_features(feature_extractor):
url_column, url_tokens_column = keys.URL, keys.URL_TOKENS
# Exact match on URLs
feature_extractor.add(
features.ExactMatch(url_column, url_column, label=f'{url_column}_exact')
)
# Match on URL tokens
feature_extractor.add(
features.SharedTokensPlus(
url_tokens_column,
url_tokens_column,
label=f'{url_tokens_column}_shared',
stop_words=text_utils.STOPWORDS_URL_TOKENS,
)
)
def _add_name_tokens_features(feature_extractor):
name_tokens_column = keys.NAME_TOKENS
# Levenshtein distance on name tokens
feature_extractor.add(
features.SimilarStrings(
name_tokens_column,
name_tokens_column,
label=f'{name_tokens_column}_levenshtein',
)
)
# String kernel similarity on name tokens
feature_extractor.add(
features.SimilarStrings(
name_tokens_column,
name_tokens_column,
algorithm='cosine',
analyzer='char_wb',
label=f'{name_tokens_column}_string_kernel_cosine',
)
)
# Weighted intersection of name tokens
feature_extractor.add(
features.SharedTokens(
name_tokens_column,
name_tokens_column,
label=f'{name_tokens_column}_shared',
)
)
def _pair_dates(target):
if _will_handle_birth_dates(target):
LOGGER.info('Pairing birth date columns with precision ones ...')
target[keys.DATE_OF_BIRTH] = list(
zip(target[keys.DATE_OF_BIRTH], target[keys.BIRTH_PRECISION])
)
target.drop(columns=[keys.BIRTH_PRECISION], inplace=True)
log_dataframe_info(
LOGGER, target, 'Paired birth date columns with precision ones'
)
if _will_handle_death_dates(target):
LOGGER.info('Pairing death date columns with precision ones ...')
target[keys.DATE_OF_DEATH] = list(
zip(target[keys.DATE_OF_DEATH], target[keys.DEATH_PRECISION])
)
target.drop(columns=[keys.DEATH_PRECISION], inplace=True)
log_dataframe_info(
LOGGER, target, 'Paired death date columns with precision ones'
)
def _rename_or_drop_tid_columns(target):
LOGGER.info(
"Renaming '%s' column with no null values to '%s' "
"& dropping '%s' columns with null values ...",
keys.CATALOG_ID,
keys.TID,
keys.CATALOG_ID,
)
# If `catalog_id` is one column (i.e., a `Series`),
# then it won't have None values
if isinstance(target[keys.CATALOG_ID], pd.Series):
target[keys.TID] = target[keys.CATALOG_ID]
else:
no_nulls = target[keys.CATALOG_ID].dropna(axis=1)
# It may happen that more than 1 column has no null values:
# in this case, they must be identical,
# so take the first one
target[keys.TID] = (
no_nulls.iloc[:, 0] if isinstance(no_nulls, pd.DataFrame) else no_nulls
)
target.drop(columns=keys.CATALOG_ID, inplace=True)
log_dataframe_info(
LOGGER,
target,
f"Renamed '{keys.CATALOG_ID}' column with no null values to "
f"'{keys.TID}' & dropped '{keys.CATALOG_ID}' columns with null values",
)
def _shared_preprocessing(
df: pd.DataFrame, will_handle_birth_date: bool, will_handle_death_date: bool
) -> pd.DataFrame:
LOGGER.info('Normalizing columns with names ...')
for column in constants.NAME_FIELDS:
if df.get(column) is not None:
df[column] = df[column].map(_normalize_values)
_occupations_to_set(df)
if will_handle_birth_date:
LOGGER.info('Handling birth dates ...')
_handle_dates(df, keys.DATE_OF_BIRTH)
if will_handle_death_date:
LOGGER.info('Handling death dates ...')
_handle_dates(df, keys.DATE_OF_DEATH)
return df
def _handle_goal(goal, catalog, entity, dir_io):
if goal == 'training':
wd_io_path = os.path.join(
dir_io, constants.WD_TRAINING_SET.format(catalog, entity)
)
qids_and_tids = {}
elif goal == 'classification':
wd_io_path = os.path.join(
dir_io, constants.WD_CLASSIFICATION_SET.format(catalog, entity)
)
qids_and_tids = None
else:
raise ValueError(
f"Invalid 'goal' parameter: {goal}. "
f"It should be 'training' or 'classification'"
)
return qids_and_tids, wd_io_path
def _reconstruct_qids_and_tids(wd_io_path, qids_and_tids):
with gzip.open(wd_io_path, 'rt') as wd_io:
for line in wd_io:
item = json.loads(line.rstrip())
qids_and_tids[item[keys.QID]] = {keys.TID: item[keys.TID]}
LOGGER.debug(
"Reconstructed dictionary with QIDS and target IDs from '%s'",
wd_io_path,
)
def _normalize_values(values):
normalized_values = set()
if values is nan or not any(values):
return nan
for value in values:
if not value:
continue
_, normalized = text_utils.normalize(value)
normalized_values.add(normalized)
return list(normalized_values) if normalized_values else nan
def _drop_null_columns(target):
target.dropna(axis=1, how='all', inplace=True)
log_dataframe_info(LOGGER, target, 'Dropped columns with null values only')
def _handle_dates(df, column):
# Datasets are hitting pandas timestamp limitations, see
# http://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#timestamp-limitations
# Parse into Period instead, see
# http://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#timeseries-oob
if df.get(column) is None:
LOGGER.warning(
"No '%s' column in DataFrame, won't handle its dates. Perhaps it was dropped because it contained null values only",
column,
)
df[column] = df[column].map(_parse_dates_list, na_action='ignore')
log_dataframe_info(LOGGER, df, 'Parsed dates')
def _will_handle_birth_dates(df: pd.DataFrame) -> bool:
dob_column = df.get(keys.DATE_OF_BIRTH)
if dob_column is None:
LOGGER.warning(
"'%s' column is not in DataFrame, won't handle birth dates. Perhaps it was dropped because they contained null values only",
keys.DATE_OF_BIRTH,
)
return False
return True
def _will_handle_death_dates(df: pd.DataFrame) -> bool:
dod_column = df.get(keys.DATE_OF_DEATH)
if dod_column is None:
LOGGER.warning(
"'%s' column is not in DataFrame, won't handle death dates. Perhaps it was dropped because they contained null values only",
keys.DATE_OF_DEATH,
)
return False
return True
def _parse_dates_list(dates_list):
dates = []
# 1990-11-06T00:00:00Z
for date, precision in dates_list:
if pd.isna(date) or pd.isna(precision):
LOGGER.debug(
'Skipping null value. Date: %s - Precision: %s', date, precision
)
continue
if precision in vocabulary.DATE_PRECISION:
if precision < vocabulary.YEAR: # From decades to billion years
LOGGER.debug(
'Date precision: %s. Falling back to YEAR, due to lack of support in Python pandas.Period',
vocabulary.DATE_PRECISION[precision],
)
_build_date_object(date, 4, dates)
elif precision == vocabulary.YEAR:
_build_date_object(date, 4, dates)
elif precision == vocabulary.MONTH:
_build_date_object(date, 7, dates)
elif precision == vocabulary.DAY:
_build_date_object(date, 10, dates)
elif precision == vocabulary.HOUR:
_build_date_object(date, 13, dates)
elif precision == vocabulary.MINUTE:
_build_date_object(date, 16, dates)
elif precision == vocabulary.SECOND:
_build_date_object(date, len(date), dates)
else:
LOGGER.warning(
'Unexpected date precision: %s. Will try to parse the date anyway',
precision,
)
_build_date_object(date, len(date), dates)
if not dates:
return pd.NaT
return dates
def _build_date_object(value, slice_index, to_dates_list):
if isinstance(value, (datetime.date, datetime.datetime)):
value = value.isoformat()
try:
to_dates_list.append(pd.Period(value[:slice_index]))
except ValueError as ve:
LOGGER.warning("Skipping date that can't be parsed: %s. Reason: %s", value, ve)
def _occupations_to_set(df):
col_name = vocabulary.LINKER_PIDS[vocabulary.OCCUPATION]
if col_name not in df.columns:
LOGGER.info("No '%s' column in DataFrame, won't handle them", col_name)
return
def to_set(itm):
# if it is an empty array (from source), or an
# empty string (from target)
if not itm:
return set()
# sanity check: itm should not be NaN
if isinstance(itm, float) and pd.isna(itm):
LOGGER.warning(
"Unexpected 'NaN' value while converting lists of occupations to sets. Treating it as an empty set."
)
return set()
# when coming from the DB, the occupations for target
# are an array with only one element which is a string
# of space separated occupations (or an empty string
# in case there are no occupations)
if len(itm) == 1:
# get inner occupation ids and remove empty occupations
itm = [x for x in itm[0].split() if x]
return set(itm)
LOGGER.info('Converting list of occupations into set ...')
df[col_name] = df[col_name].apply(to_set)
def _tokenize_values(values, tokenize_func):
if values is nan:
return nan
all_tokens = set()
for value in values:
value_tokens = tokenize_func(value)
if value_tokens:
all_tokens.update(value_tokens)
if not all_tokens:
LOGGER.debug('No tokens from list of values: %s', values)
return nan
return list(all_tokens)