Source code for soweego.validator.checks

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""A set of checks to validate Wikidata against target catalogs."""

__author__ = 'Marco Fossati'
__email__ = 'fossati@spaziodati.eu'
__version__ = '1.0'
__license__ = 'GPL-3.0'
__copyright__ = 'Copyleft 2018, Hjfocs'

import csv
import json
import logging
import os
from collections import defaultdict
from itertools import zip_longest
from typing import DefaultDict, Dict, Iterator, List, Tuple

import click
from sqlalchemy.exc import SQLAlchemyError

from soweego.commons import constants, data_gathering, keys, target_database
from soweego.commons.db_manager import DBManager
from soweego.ingester import wikidata_bot
from soweego.wikidata import vocabulary

LOGGER = logging.getLogger(__name__)

# For dead_ids_cli
DEAD_IDS_FILENAME = '{}_{}_dead_identifiers.json'
WD_IDS_FILENAME = '{}_{}_identifiers_in_wikidata.json'
# For links_cli
LINKS_IDS_TO_BE_DEPRECATED_FILENAME = '{}_{}_identifiers_to_be_deprecated.json'
EXTRA_IDS_TO_BE_ADDED_FILENAME = '{}_{}_third_party_identifiers_to_be_added.csv'
URLS_TO_BE_ADDED_FILENAME = '{}_{}_urls_to_be_added.csv'
WD_LINKS_FILENAME = '{}_{}_urls_in_wikidata.json'
# For bio_cli
BIO_IDS_TO_BE_DEPRECATED_FILENAME = '{}_{}_identifiers_to_be_deprecated.json'
BIO_STATEMENTS_TO_BE_ADDED_FILENAME = '{}_{}_bio_statements_to_be_added.csv'
WD_BIO_FILENAME = '{}_{}_bio_data_in_wikidata.json'


@click.command()
@click.argument(
    'catalog', type=click.Choice(target_database.supported_targets())
)
@click.argument(
    'entity', type=click.Choice(target_database.supported_entities())
)
@click.option(
    '-d',
    '--deprecate',
    is_flag=True,
    help='Deprecate dead identifiers: this changes their rank in Wikidata.',
)
@click.option(
    '-s',
    '--sandbox',
    is_flag=True,
    help='Perform all deprecations on the Wikidata sandbox item Q4115189.',
)
@click.option(
    '--dump-wikidata',
    is_flag=True,
    help='Dump identifiers gathered from Wikidata to a JSON file.',
)
@click.option(
    '--dir-io',
    type=click.Path(file_okay=False),
    default=constants.SHARED_FOLDER,
    help=f'Input/output directory, default: {constants.SHARED_FOLDER}.',
)
def dead_ids_cli(catalog, entity, deprecate, sandbox, dump_wikidata, dir_io):
    """Check if identifiers are still alive.

    Dump a JSON file of dead ones. Format: { identifier: [ list of QIDs ] }

    Dead identifiers should get a deprecated rank in Wikidata:
    you can pass the '-d' flag to do so.
    """
    dead_ids_path = os.path.join(
        dir_io, DEAD_IDS_FILENAME.format(catalog, entity)
    )
    wd_ids_path = os.path.join(dir_io, WD_IDS_FILENAME.format(catalog, entity))

    # Handle Wikidata cache
    if os.path.isfile(wd_ids_path):
        with open(wd_ids_path) as wdin:
            wd_ids = _load_wd_cache(wdin)
        # Discard the second return value: Wikidata cache
        dead, _ = dead_ids(catalog, entity, wd_cache=wd_ids)
    else:
        dead, wd_ids = dead_ids(catalog, entity)

    # Dump ids gathered from Wikidata
    if dump_wikidata:
        _dump_wd_cache(wd_ids, wd_ids_path)
        LOGGER.info(
            'Identifiers gathered from Wikidata dumped to %s', wd_ids_path
        )

    # Dump dead ids
    with open(dead_ids_path, 'w') as fout:
        # Sets are not serializable to JSON, so cast them to lists
        json.dump(
            {target_id: list(qids) for target_id, qids in dead.items()},
            fout,
            indent=2,
        )

    LOGGER.info('Dead identifiers dumped to %s', dead_ids_path)

    # Deprecate dead ids in Wikidata
    if deprecate:
        _upload_result(catalog, entity, dead, None, None, sandbox)


@click.command()
@click.argument(
    'catalog', type=click.Choice(target_database.supported_targets())
)
@click.argument(
    'entity', type=click.Choice(target_database.supported_entities())
)
@click.option(
    '-u', '--upload', is_flag=True, help='Upload the output to Wikidata.'
)
@click.option(
    '-s',
    '--sandbox',
    is_flag=True,
    help='Perform all edits on the Wikidata sandbox item Q4115189.',
)
@click.option(
    '--dump-wikidata',
    is_flag=True,
    help='Dump URLs gathered from Wikidata to a JSON file.',
)
@click.option(
    '--dir-io',
    type=click.Path(file_okay=False),
    default=constants.SHARED_FOLDER,
    help=f'Input/output directory, default: {constants.SHARED_FOLDER}.',
)
def links_cli(catalog, entity, upload, sandbox, dump_wikidata, dir_io):
    """Validate identifiers against links.

    Dump 3 output files:

    1. target identifiers to be deprecated. Format: (JSON) {identifier: [list of QIDs]}

    2. third-party identifiers to be added. Format: (CSV) QID,identifier_PID,identifier

    3. URLs to be added. Format: (CSV) QID,P973,URL

    You can pass the '-u' flag to upload the output to Wikidata.
    """
    # Output paths
    deprecated_path = os.path.join(
        dir_io, LINKS_IDS_TO_BE_DEPRECATED_FILENAME.format(catalog, entity)
    )
    ids_path = os.path.join(
        dir_io, EXTRA_IDS_TO_BE_ADDED_FILENAME.format(catalog, entity)
    )
    urls_path = os.path.join(
        dir_io, URLS_TO_BE_ADDED_FILENAME.format(catalog, entity)
    )
    wd_links_path = os.path.join(
        dir_io, WD_LINKS_FILENAME.format(catalog, entity)
    )

    # Handle Wikidata cache
    if os.path.isfile(wd_links_path):
        with open(wd_links_path) as wdin:
            wd_links = _load_wd_cache(wdin)
        # Discard the last return value: Wikidata cache
        ids_to_be_deprecated, ids_to_be_added, urls_to_be_added, _ = links(
            catalog, entity, wd_cache=wd_links
        )
    else:
        ids_to_be_deprecated, ids_to_be_added, urls_to_be_added, wd_links = links(
            catalog, entity
        )

    # Nothing to do: the catalog doesn't contain links
    if ids_to_be_deprecated is None:
        return

    # Dump Wikidata cache
    if dump_wikidata:
        _dump_wd_cache(wd_links, wd_links_path)
        LOGGER.info('URLs gathered from Wikidata dumped to %s', wd_links_path)

    # Dump output files
    _dump_deprecated(ids_to_be_deprecated, deprecated_path)
    _dump_csv_output(ids_to_be_added, ids_path, 'third-party IDs')
    _dump_csv_output(urls_to_be_added, urls_path, 'URLs')

    # Upload the output to Wikidata
    if upload:
        _upload_result(
            catalog,
            entity,
            ids_to_be_deprecated,
            urls_to_be_added,
            ids_to_be_added,
            sandbox,
        )


@click.command()
@click.argument(
    'catalog', type=click.Choice(target_database.supported_targets())
)
@click.argument(
    'entity', type=click.Choice(target_database.supported_entities())
)
@click.option(
    '-u', '--upload', is_flag=True, help='Upload the output to Wikidata.'
)
@click.option(
    '-s',
    '--sandbox',
    is_flag=True,
    help='Perform all edits on the Wikidata sandbox item Q4115189.',
)
@click.option(
    '--dump-wikidata',
    is_flag=True,
    help='Dump biographical data gathered from Wikidata to a JSON file.',
)
@click.option(
    '--dir-io',
    type=click.Path(file_okay=False),
    default=constants.SHARED_FOLDER,
    help=f'Input/output directory, default: {constants.SHARED_FOLDER}.',
)
def bio_cli(catalog, entity, upload, sandbox, dump_wikidata, dir_io):
    """Validate identifiers against biographical data.

    Look for birth/death dates, birth/death places, gender.

    Dump 2 output files:

    1. target identifiers to be deprecated. Format: (JSON) {identifier: [list of QIDs]}

    2. statements to be added. Format: (CSV) QID,metadata_PID,value

    You can pass the '-u' flag to upload the output to Wikidata.
    """
    deprecated_path = os.path.join(
        dir_io, BIO_IDS_TO_BE_DEPRECATED_FILENAME.format(catalog, entity)
    )
    statements_path = os.path.join(
        dir_io, BIO_STATEMENTS_TO_BE_ADDED_FILENAME.format(catalog, entity)
    )
    wd_bio_path = os.path.join(dir_io, WD_BIO_FILENAME.format(catalog, entity))

    # Handle Wikidata cache
    if os.path.isfile(wd_bio_path):
        with open(wd_bio_path) as wdin:
            wd_bio = _load_wd_cache(wdin)
        # Discard the last return value: Wikidata cache
        to_be_deprecated, to_be_added, _ = bio(catalog, entity, wd_cache=wd_bio)
    else:
        to_be_deprecated, to_be_added, wd_bio = bio(catalog, entity)

    # Nothing to do: the catalog doesn't contain biographical data
    if to_be_deprecated is None:
        return

    # Dump Wikidata cache
    if dump_wikidata:
        _dump_wd_cache(wd_bio, wd_bio_path)
        LOGGER.info(
            'Biographical data gathered from Wikidata dumped to %s', wd_bio_path
        )

    # Dump output files
    _dump_deprecated(to_be_deprecated, deprecated_path)
    _dump_csv_output(to_be_added, statements_path, 'statements')

    # Upload the output to Wikidata
    if upload:
        _upload(catalog, entity, to_be_deprecated, to_be_added, sandbox)


[docs]def dead_ids( catalog: str, entity: str, wd_cache=None ) -> Tuple[DefaultDict, Dict]: """Look for dead identifiers in Wikidata. An identifier is dead if it does not exist in the given catalog when this function is executed. Dead identifiers should be marked with a deprecated rank in Wikidata. **How it works:** 1. gather identifiers of the given catalog from relevant Wikidata items 2. look them up in the given catalog 3. if an identifier is not in the given catalog anymore, it should be deprecated :param catalog: ``{'discogs', 'imdb', 'musicbrainz'}``. A supported catalog :param entity: ``{'actor', 'band', 'director', 'musician', 'producer', 'writer', 'audiovisual_work', 'musical_work'}``. A supported entity :param wd_cache: (optional) a ``dict`` of identifiers gathered from Wikidata in a previous run :return: the ``dict`` pair of dead identifiers and identifiers gathered from Wikidata """ dead = defaultdict(set) db_entity = target_database.get_main_entity(catalog, entity) # Wikidata side if wd_cache is None: wd_ids = {} data_gathering.gather_target_ids( entity, catalog, target_database.get_catalog_pid(catalog, entity), wd_ids, ) else: wd_ids = wd_cache # Target catalog side session = DBManager.connect_to_db() try: for qid in wd_ids: for tid in wd_ids[qid][keys.TID]: existing = ( session.query(db_entity.catalog_id) .filter_by(catalog_id=tid) .count() ) if existing == 0: LOGGER.debug( '%s %s identifier %s is dead', qid, catalog, tid ) dead[tid].add(qid) session.commit() except SQLAlchemyError as error: LOGGER.error( "Failed query of target catalog identifiers due to %s. " "You can enable the debug log with the CLI option " "'-l soweego.validator DEBUG' for more details", error.__class__.__name__, ) LOGGER.debug(error) session.rollback() finally: session.close() LOGGER.info( 'Check completed. Target: %s %s. Total dead identifiers: %d', catalog, entity, len(dead), ) return dead, wd_ids
[docs]def bio( catalog: str, entity: str, wd_cache=None ) -> Tuple[DefaultDict, Iterator, Dict]: """Validate identifiers against available biographical data. Look for: - birth and death dates - birth and death places - gender Also generate statements based on additional data found in the given catalog. They can be used to enrich Wikidata items. **How it works:** 1. gather data from the given catalog 2. gather data from relevant Wikidata items 3. look for shared data between pairs of Wikidata and catalog items: - when the pair does not share any data, the catalog identifier should be marked with a deprecated rank - when the catalog item has more data than the Wikidata one, it should be added to the latter :param catalog: ``{'discogs', 'imdb', 'musicbrainz'}``. A supported catalog :param entity: ``{'actor', 'band', 'director', 'musician', 'producer', 'writer', 'audiovisual_work', 'musical_work'}``. A supported entity :param wd_cache: (optional) a ``dict`` of links gathered from Wikidata in a previous run :return: 3 objects 1. ``dict`` of identifiers that should be deprecated 2. ``generator`` of statements that should be added 3. ``dict`` of biographical data gathered from Wikidata """ # Target catalog side first: # enable early return in case of no target data target_bio = data_gathering.gather_target_biodata(entity, catalog) if target_bio is None: return None, None, None to_be_deprecated, to_be_added = defaultdict(set), defaultdict(set) # Wikidata side if wd_cache is None: wd_bio = {} data_gathering.gather_target_ids( entity, catalog, target_database.get_catalog_pid(catalog, entity), wd_bio, ) data_gathering.gather_wikidata_biodata(wd_bio) else: wd_bio = wd_cache # Validation _validate(keys.BIODATA, wd_bio, target_bio, to_be_deprecated, to_be_added) return to_be_deprecated, _bio_to_be_added_generator(to_be_added), wd_bio
def _bio_to_be_added_generator(to_be_added): for qid, values in to_be_added.items(): for pid, value in values: yield qid, pid, value def _validate(criterion, wd, target_generator, to_be_deprecated, to_be_added): LOGGER.info('Starting check against target %s ...', criterion) target = _consume_target_generator(target_generator) # Large loop size: total Wikidata class instances with identifiers, # e.g., 80k musicians for qid, data in wd.items(): wd_data = data.get(criterion) # Skip when no Wikidata data if not wd_data: LOGGER.warning( 'Skipping check: no %s available in QID %s (Wikidata side)', criterion, qid, ) continue wd_tids = data[keys.TID] # 1 or tiny loop size: total target IDs per Wikidata item # (it should almost always be 1) for tid in wd_tids: if tid in target.keys(): target_data = target.get(tid) # Skip when no target data if not target_data: LOGGER.warning( 'Skipping check: no %s available in ' 'target ID %s (target side)', criterion, tid, ) continue shared_data, extra_data = _compute_shared_and_extra( criterion, wd_data, target_data ) if not shared_data: LOGGER.debug( 'No shared %s between %s and %s. The identifier ' 'statement should be deprecated', criterion, qid, tid, ) to_be_deprecated[tid].add(qid) else: LOGGER.debug( '%s and %s share these %s: %s', qid, tid, criterion, shared_data, ) if extra_data: LOGGER.debug( '%s has extra %s that should be added to %s: %s', tid, criterion, qid, extra_data, ) to_be_added[qid].update(extra_data) else: LOGGER.debug('%s has no extra %s', tid, criterion) LOGGER.info( 'Check against target %s completed: %d IDs to be deprecated, ' '%d Wikidata items with statements to be added', criterion, len(to_be_deprecated), len(to_be_added), ) def _compute_shared_and_extra(criterion, wd_data, target_data): # Properly compare dates when checking biographical data if criterion == keys.BIODATA: wd_dates = _extract_dates(wd_data) target_dates = _extract_dates(target_data) shared_dates, extra_dates = _compare_dates(wd_dates, target_dates) shared = wd_data.intersection(target_data).union(shared_dates) extra = target_data.difference(wd_data).union(extra_dates) else: shared = wd_data.intersection(target_data) extra = target_data.difference(wd_data) return shared, extra def _extract_dates(data): dates = set() for pid, value in data: if pid in (vocabulary.DATE_OF_BIRTH, vocabulary.DATE_OF_DEATH): dates.add((pid, value)) # Remove dates from input set data.difference_update(dates) return dates def _compare_dates(wd, target): shared_dates, extra_dates = set(), set() for wd_elem, t_elem in zip_longest(wd, target): # Skip pair with None elements if None in (wd_elem, t_elem): continue wd_pid, wd_val = wd_elem t_pid, t_val = t_elem # Don't compare birth with death dates if wd_pid != t_pid: continue # Skip unexpected None values if None in (wd_val, t_val): LOGGER.warning( 'Skipping unexpected %s date pair with missing value(s)', (wd_elem, t_elem), ) continue wd_timestamp, wd_precision = wd_val.split('/') t_timestamp, t_precision = t_val.split('/') shared_date, extra_date = _match_dates_by_precision( min(int(wd_precision), int(t_precision)), wd_elem, wd_timestamp, t_elem, t_timestamp, ) if shared_date is not None: shared_dates.add(shared_date) if extra_date is not None: extra_dates.add(extra_date) return shared_dates, extra_dates def _match_dates_by_precision( precision, wd_elem, wd_timestamp, t_elem, t_timestamp ): slice_indices = { vocabulary.YEAR: 4, vocabulary.MONTH: 7, vocabulary.DAY: 10, } index = slice_indices.get(precision) if index is None: LOGGER.info( "Won't try to match date pair %s: too low or too high precision", (wd_elem, t_elem), ) return None, None shared, extra = None, None wd_simplified = wd_timestamp[:index] t_simplified = t_timestamp[:index] if wd_simplified == t_simplified: LOGGER.debug( 'Date pair %s matches on %s', (wd_timestamp, t_timestamp), (wd_simplified, t_simplified), ) shared = wd_elem else: LOGGER.debug('Target has an extra date: %s', t_timestamp) # Output dates in ISO format # t_elem[0] is the PID extra = (t_elem[0], t_timestamp) return shared, extra def _upload_result( catalog, entity, to_deprecate, urls_to_add, ext_ids_to_add, sandbox ): _upload(catalog, entity, to_deprecate, urls_to_add, sandbox) LOGGER.info('Starting addition of external IDs to Wikidata ...') wikidata_bot.add_people_statements(ext_ids_to_add, sandbox) def _upload(catalog, entity, to_deprecate, to_add, sandbox): LOGGER.info('Starting deprecation of %s IDs ...', catalog) wikidata_bot.delete_or_deprecate_identifiers( 'deprecate', catalog, entity, to_deprecate, sandbox ) LOGGER.info('Starting addition of statements to Wikidata ...') wikidata_bot.add_people_statements(to_add, sandbox) def _dump_deprecated(data, outpath): if data: with open(outpath, 'w') as deprecated: json.dump( {target_id: list(qids) for target_id, qids in data.items()}, deprecated, indent=2, ) LOGGER.info('IDs to be deprecated dumped to %s', outpath) else: LOGGER.info("No IDs to be deprecated, won't dump to file") def _dump_csv_output(data, outpath, log_msg_subject): if data: with open(outpath, 'w') as ids_out: writer = csv.writer(ids_out) writer.writerows(data) LOGGER.info('%s to be added dumped to %s', log_msg_subject, outpath) else: LOGGER.info("No %s to be added, won't dump to file", log_msg_subject) def _load_wd_cache(file_handle): raw_cache = json.load(file_handle) LOGGER.info("Loaded Wikidata cache from '%s'", file_handle.name) cache = {} for qid, data in raw_cache.items(): for data_type, value_list in data.items(): # Biodata has values that are a list if isinstance(value_list[0], list): value_set = set() for value in value_list: if isinstance(value[1], list): same_pid, different_values = value[0], value[1] for val in different_values: value_set.add((same_pid, val)) else: value_set.add(tuple(value)) if cache.get(qid): cache[qid][data_type] = value_set else: cache[qid] = {data_type: value_set} else: if cache.get(qid): cache[qid][data_type] = set(value_list) else: cache[qid] = {data_type: set(value_list)} return cache def _dump_wd_cache(cache, outpath): with open(outpath, 'w') as outfile: json.dump( { qid: { data_type: list(values) for data_type, values in data.items() } for qid, data in cache.items() }, outfile, indent=2, ensure_ascii=False, ) def _consume_target_generator(target_generator): target = defaultdict(set) for identifier, *data in target_generator: if len(data) == 1: # Links target[identifier].add(data.pop()) else: # Biographical data target[identifier].add(tuple(data)) return target