Source code for soweego.validator.checks

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""A set of checks to validate Wikidata against target catalogs."""

__author__ = 'Marco Fossati'
__email__ = 'fossati@spaziodati.eu'
__version__ = '2.0'
__license__ = 'GPL-3.0'
__copyright__ = 'Copyleft 2021, Hjfocs'

import csv
import json
import logging
import os
import pickle
from collections import defaultdict
from re import match
from typing import DefaultDict, Dict, Iterator, Optional, Tuple

import click
from sqlalchemy.exc import SQLAlchemyError

from soweego.commons import constants, data_gathering, keys, target_database, text_utils
from soweego.commons.db_manager import DBManager
from soweego.ingester import wikidata_bot
from soweego.wikidata import api_requests, vocabulary
from soweego.wikidata.api_requests import get_url_blacklist

LOGGER = logging.getLogger(__name__)

# File name templates
# For all CLIs
WD_CACHE_FNAME = '{catalog}_{entity}_{criterion}_wd_cache.pkl'
# For `links_cli` and `bio_cli`
IDS_TO_BE_DEPRECATED_FNAME = '{catalog}_{entity}_{criterion}_ids_to_be_deprecated.json'
SHARED_STATEMENTS_FNAME = '{catalog}_{entity}_{criterion}_shared_statements.csv'
WD_STATEMENTS_FNAME = 'wikidata_{criterion}_for_{catalog}_{entity}.csv'
# For `dead_ids_cli`
DEAD_IDS_FNAME = '{catalog}_{entity}_dead_ids.json'
# For `links_cli`
EXT_IDS_FNAME = '{catalog}_{entity}_external_ids_to_be_{task}.csv'
URLS_FNAME = '{catalog}_{entity}_urls_to_be_{task}.csv'
# For `bio_cli`
BIO_STATEMENTS_TO_BE_ADDED_FNAME = '{catalog}_{entity}_bio_statements_to_be_added.csv'

# URL prefixes for catalog providers
QID_PREFIX = 'https://www.wikidata.org/wiki/'
PID_PREFIX = QID_PREFIX + 'Property:'


@click.command()
@click.argument('catalog', type=click.Choice(target_database.supported_targets()))
@click.argument('entity', type=click.Choice(target_database.supported_entities()))
@click.option(
    '-d',
    '--deprecate',
    is_flag=True,
    help='Deprecate dead identifiers: this changes their rank in Wikidata.',
)
@click.option(
    '-s',
    '--sandbox',
    is_flag=True,
    help=f'Perform all deprecations on the Wikidata sandbox item {vocabulary.SANDBOX_2}.',
)
@click.option(
    '--dump-wikidata',
    is_flag=True,
    help='Dump identifiers gathered from Wikidata to a Python pickle.',
)
@click.option(
    '--dir-io',
    type=click.Path(file_okay=False),
    default=constants.WORK_DIR,
    help=f'Input/output directory, default: {constants.WORK_DIR}.',
)
def dead_ids_cli(catalog, entity, deprecate, sandbox, dump_wikidata, dir_io):
    """Check if identifiers are still alive.

    Dump a JSON file of dead ones. Format: { identifier: [ list of QIDs ] }

    Dead identifiers should get a deprecated rank in Wikidata:
    you can pass the '-d' flag to do so.
    """
    dead_ids_path = os.path.join(
        dir_io, DEAD_IDS_FNAME.format(catalog=catalog, entity=entity)
    )
    wd_cache_path = os.path.join(
        dir_io,
        WD_CACHE_FNAME.format(catalog=catalog, entity=entity, criterion='dead_ids'),
    )

    # Handle Wikidata cache
    if os.path.isfile(wd_cache_path):
        with open(wd_cache_path, 'rb') as cin:
            wd_cache = pickle.load(cin)
        LOGGER.info("Loaded Wikidata cache from '%s'", cin.name)
        # Discard the second return value: Wikidata cache
        dead, _ = dead_ids(catalog, entity, wd_cache=wd_cache)
    else:
        dead, wd_cache = dead_ids(catalog, entity)

    # Dump dead ids
    with open(dead_ids_path, 'w') as fout:
        # Sets are not serializable to JSON, so cast them to lists
        json.dump(
            {target_id: list(qids) for target_id, qids in dead.items()},
            fout,
            indent=2,
        )
    LOGGER.info('Dead identifiers dumped to %s', dead_ids_path)

    # Dump Wikidata cache
    if dump_wikidata:
        try:
            with open(wd_cache_path, 'wb') as cout:
                # Using the highest protocol available for the current Python
                # version should be the most efficient solution
                pickle.dump(wd_cache, cout, protocol=pickle.HIGHEST_PROTOCOL)
            LOGGER.info(
                'Identifiers gathered from Wikidata dumped to %s', wd_cache_path
            )
        except MemoryError:
            LOGGER.warning('Could not pickle the Wikidata cache: memory error')

    # Deprecate dead ids in Wikidata
    if deprecate:
        LOGGER.info('Starting deprecation of %s IDs ...', catalog)
        wikidata_bot.delete_or_deprecate_identifiers(
            'deprecate', catalog, entity, dead, sandbox
        )


@click.command()
@click.argument('catalog', type=click.Choice(target_database.supported_targets()))
@click.argument('entity', type=click.Choice(target_database.supported_entities()))
@click.option(
    '-b',
    '--blacklist',
    is_flag=True,
    help='Filter low-quality URLs through a blacklist.',
)
@click.option('-u', '--upload', is_flag=True, help='Upload the output to Wikidata.')
@click.option(
    '-s',
    '--sandbox',
    is_flag=True,
    help=f'Perform all edits on the Wikidata sandbox item {vocabulary.SANDBOX_2}.',
)
@click.option(
    '--dump-wikidata',
    is_flag=True,
    help='Dump URLs gathered from Wikidata to a Python pickle.',
)
@click.option(
    '--dir-io',
    type=click.Path(file_okay=False),
    default=constants.WORK_DIR,
    help=f'Input/output directory, default: {constants.WORK_DIR}.',
)
def links_cli(catalog, entity, blacklist, upload, sandbox, dump_wikidata, dir_io):
    """Validate identifiers against links.

    Dump 6 output files:

    1. catalog IDs to be deprecated.
    JSON format: {catalog_ID: [list of QIDs]}

    2. third-party IDs to be added.
    CSV format: QID,third-party_PID,third-party_ID,catalog_ID

    3. URLs to be added.
    CSV format: QID,P2888,URL,catalog_ID

    4. third-party IDs to be referenced.
    Same format as file #2

    5. URLs to be referenced.
    Same format as file #3

    6. URLs found in Wikidata but not in the target catalog.
    CSV format: catalog_ID,URL,QID_URL

    You can pass the '-u' flag to upload the output to Wikidata.

    The '-b' flag applies a URL blacklist of low-quality Web domains to file #3.
    """
    criterion = 'links'
    # Output paths
    deprecate_path = os.path.join(
        dir_io,
        IDS_TO_BE_DEPRECATED_FNAME.format(
            catalog=catalog, entity=entity, criterion=criterion
        ),
    )
    add_ext_ids_path = os.path.join(
        dir_io,
        EXT_IDS_FNAME.format(catalog=catalog, entity=entity, task='added'),
    )
    add_urls_path = os.path.join(
        dir_io, URLS_FNAME.format(catalog=catalog, entity=entity, task='added')
    )
    ref_ext_ids_path = os.path.join(
        dir_io,
        EXT_IDS_FNAME.format(catalog=catalog, entity=entity, task='referenced'),
    )
    ref_urls_path = os.path.join(
        dir_io,
        URLS_FNAME.format(catalog=catalog, entity=entity, task='referenced'),
    )
    wd_urls_path = os.path.join(
        dir_io,
        WD_STATEMENTS_FNAME.format(criterion=criterion, catalog=catalog, entity=entity),
    )
    wd_cache_path = os.path.join(
        dir_io,
        WD_CACHE_FNAME.format(catalog=catalog, entity=entity, criterion=criterion),
    )

    # Wikidata cache
    wd_cache = None
    if os.path.isfile(wd_cache_path):
        with open(wd_cache_path, 'rb') as cin:
            wd_cache = pickle.load(cin)
        LOGGER.info("Loaded Wikidata cache from '%s'", cin.name)

    # Run validation
    result = links(catalog, entity, url_blacklist=blacklist, wd_cache=wd_cache)

    # Nothing to do: the catalog doesn't contain links
    if result is None:
        return

    # Unpack the result tuple
    (
        deprecate,
        add_ext_ids,
        add_urls,
        ref_ext_ids,
        ref_urls,
        wd_urls,
        wd_cache,
    ) = result
    # Dump output files
    _dump_deprecated(deprecate, deprecate_path)
    _dump_csv_output(add_ext_ids, add_ext_ids_path, 'third-party IDs to be added')
    _dump_csv_output(add_urls, add_urls_path, 'URLs to be added')
    _dump_csv_output(
        ref_ext_ids, ref_ext_ids_path, 'shared third-party IDs to be referenced'
    )
    _dump_csv_output(ref_urls, ref_urls_path, 'shared URLs to be referenced')
    _dump_csv_output(wd_urls, wd_urls_path, f'Wikidata URLs not in {catalog} {entity}')

    # Dump Wikidata cache
    if dump_wikidata:
        try:
            with open(wd_cache_path, 'wb') as cout:
                # Using the highest protocol available for the current Python
                # version should be the most efficient solution
                pickle.dump(wd_cache, cout, protocol=pickle.HIGHEST_PROTOCOL)
            LOGGER.info('URLs gathered from Wikidata dumped to %s', wd_cache_path)
        except MemoryError:
            LOGGER.warning('Could not pickle the Wikidata cache: memory error')

    # Upload the output to Wikidata
    if upload:
        if sandbox:
            LOGGER.info(
                'Running on the Wikidata sandbox item %s ...',
                vocabulary.SANDBOX_2,
            )
        LOGGER.info('Starting deprecation of %s IDs ...', catalog)
        wikidata_bot.delete_or_deprecate_identifiers(
            'deprecate', catalog, entity, deprecate, sandbox
        )
        LOGGER.info('Starting addition of external IDs to Wikidata ...')
        wikidata_bot.add_people_statements(catalog, add_ext_ids, criterion, sandbox)
        LOGGER.info('Starting addition of URLs to Wikidata ...')
        wikidata_bot.add_people_statements(catalog, add_urls, criterion, sandbox)
        LOGGER.info('Starting referencing of shared external IDs in Wikidata ...')
        wikidata_bot.add_people_statements(catalog, add_ext_ids, criterion, sandbox)
        LOGGER.info('Starting referencing of shared URLs in Wikidata ...')
        wikidata_bot.add_people_statements(catalog, add_urls, criterion, sandbox)


@click.command()
@click.argument('catalog', type=click.Choice(target_database.supported_targets()))
@click.argument('entity', type=click.Choice(target_database.supported_entities()))
@click.option('-u', '--upload', is_flag=True, help='Upload the output to Wikidata.')
@click.option(
    '-s',
    '--sandbox',
    is_flag=True,
    help=f'Perform all edits on the Wikidata sandbox item {vocabulary.SANDBOX_2}.',
)
@click.option(
    '--dump-wikidata',
    is_flag=True,
    help='Dump biographical data gathered from Wikidata to a Python pickle.',
)
@click.option(
    '--dir-io',
    type=click.Path(file_okay=False),
    default=constants.WORK_DIR,
    help=f'Input/output directory, default: {constants.WORK_DIR}.',
)
def bio_cli(catalog, entity, upload, sandbox, dump_wikidata, dir_io):
    """Validate identifiers against biographical data.

    Look for birth/death dates, birth/death places, gender.

    Dump 4 output files:

    1. catalog IDs to be deprecated.
    JSON format: {catalog_ID: [list of QIDs]}

    2. statements to be added.
    CSV format: QID,PID,value,catalog_ID

    3. shared statements to be referenced.
    Same format as file #2

    4. statements found in Wikidata but not in the target catalog.
    CSV format: catalog_ID,PID_URL,value,QID_URL

    You can pass the '-u' flag to upload the output to Wikidata.
    """
    criterion = 'bio'
    # Output paths
    deprecate_path = os.path.join(
        dir_io,
        IDS_TO_BE_DEPRECATED_FNAME.format(
            catalog=catalog, entity=entity, criterion=criterion
        ),
    )
    add_path = os.path.join(
        dir_io,
        BIO_STATEMENTS_TO_BE_ADDED_FNAME.format(catalog=catalog, entity=entity),
    )
    ref_path = os.path.join(
        dir_io,
        SHARED_STATEMENTS_FNAME.format(
            catalog=catalog, entity=entity, criterion=criterion
        ),
    )
    wd_stmts_path = os.path.join(
        dir_io,
        WD_STATEMENTS_FNAME.format(criterion=criterion, catalog=catalog, entity=entity),
    )
    wd_cache_path = os.path.join(
        dir_io,
        WD_CACHE_FNAME.format(catalog=catalog, entity=entity, criterion=criterion),
    )

    # Wikidata cache
    wd_cache = None
    if os.path.isfile(wd_cache_path):
        with open(wd_cache_path, 'rb') as cin:
            wd_cache = pickle.load(cin)
        LOGGER.info("Loaded Wikidata cache from '%s'", cin.name)

    # Run validation
    result = bio(catalog, entity, wd_cache=wd_cache)

    # Nothing to do: the catalog doesn't contain biographical data
    if result is None:
        return

    # Unpack the result tuple
    deprecate, add, reference, wd_stmts, wd_cache = result
    # Dump output files
    _dump_deprecated(deprecate, deprecate_path)
    _dump_csv_output(add, add_path, 'statements to be added')
    _dump_csv_output(reference, ref_path, 'shared statements to be referenced')
    _dump_csv_output(
        wd_stmts,
        wd_stmts_path,
        f'statements in Wikidata but not in {catalog} {entity}',
    )

    # Dump Wikidata cache
    if dump_wikidata:
        try:
            with open(wd_cache_path, 'wb') as cout:
                # Using the highest protocol available for the current Python
                # version should be the most efficient solution
                pickle.dump(wd_cache, cout, protocol=pickle.HIGHEST_PROTOCOL)
            LOGGER.info(
                'Biographical data gathered from Wikidata dumped to %s',
                wd_cache_path,
            )
        except MemoryError:
            LOGGER.warning('Could not pickle the Wikidata cache: memory error')

    # Upload the output to Wikidata:
    # deprecate, add, reference
    if upload:
        if sandbox:
            LOGGER.info(
                'Running on the Wikidata sandbox item %s ...',
                vocabulary.SANDBOX_2,
            )
        LOGGER.info('Starting deprecation of %s IDs ...', catalog)
        wikidata_bot.delete_or_deprecate_identifiers(
            'deprecate', catalog, entity, deprecate, sandbox
        )
        LOGGER.info('Starting addition of extra statements to Wikidata ...')
        wikidata_bot.add_people_statements(catalog, add, criterion, sandbox)
        LOGGER.info('Starting referencing of shared statements in Wikidata ...')
        wikidata_bot.add_people_statements(catalog, reference, criterion, sandbox)


[docs]def dead_ids(catalog: str, entity: str, wd_cache=None) -> Tuple[DefaultDict, Dict]: """Look for dead identifiers in Wikidata. An identifier is dead if it does not exist in the given catalog when this function is executed. Dead identifiers should be marked with a deprecated rank in Wikidata. **How it works:** 1. gather identifiers of the given catalog from relevant Wikidata items 2. look them up in the given catalog 3. if an identifier is not in the given catalog anymore, it should be deprecated :param catalog: ``{'discogs', 'imdb', 'musicbrainz'}``. A supported catalog :param entity: ``{'actor', 'band', 'director', 'musician', 'producer', 'writer', 'audiovisual_work', 'musical_work'}``. A supported entity :param wd_cache: (optional) a ``dict`` of identifiers gathered from Wikidata in a previous run :return: the ``dict`` pair of dead identifiers and identifiers gathered from Wikidata """ dead = defaultdict(set) db_entity = target_database.get_main_entity(catalog, entity) # Wikidata side if wd_cache is None: wd_ids = {} data_gathering.gather_target_ids( entity, catalog, target_database.get_catalog_pid(catalog, entity), wd_ids, ) else: wd_ids = wd_cache # Target catalog side session = DBManager.connect_to_db() try: for qid in wd_ids: for tid in wd_ids[qid][keys.TID]: existing = ( session.query(db_entity.catalog_id) .filter_by(catalog_id=tid) .count() ) if existing == 0: LOGGER.debug('%s %s identifier %s is dead', qid, catalog, tid) dead[tid].add(qid) session.commit() except SQLAlchemyError as error: LOGGER.error( "Failed query of target catalog identifiers due to %s. " "You can enable the debug log with the CLI option " "'-l soweego.validator DEBUG' for more details", error.__class__.__name__, ) LOGGER.debug(error) session.rollback() finally: session.close() LOGGER.info( 'Check completed. Target: %s %s. Total dead identifiers: %d', catalog, entity, len(dead), ) return dead, wd_ids
[docs]def bio( catalog: str, entity: str, wd_cache=None ) -> Optional[Tuple[defaultdict, Iterator, Iterator, Iterator, dict]]: """Validate identifiers against available biographical data. Look for: - birth and death dates - birth and death places - gender Also generate statements based on additional data found in the target catalog. They can be used to enrich Wikidata items. **How it works:** 1. gather data from the given catalog 2. gather data from relevant Wikidata items 3. look for shared data between pairs of Wikidata and catalog items: - when the pair does not share any data, the catalog identifier should be marked with a deprecated rank - when the catalog item has more data than the Wikidata one, it should be added to the latter :param catalog: ``{'discogs', 'imdb', 'musicbrainz'}``. A supported catalog :param entity: ``{'actor', 'band', 'director', 'musician', 'producer', 'writer', 'audiovisual_work', 'musical_work'}``. A supported entity :param wd_cache: (optional) a ``dict`` of links gathered from Wikidata in a previous run :return: 5 objects 1. ``dict`` of identifiers that should be deprecated 2. ``generator`` of statements that should be added 3. ``generator`` of shared statements that should be referenced 4. ``generator`` of statements found in Wikidata but not in the target catalog 5. ``dict`` of biographical data gathered from Wikidata or ``None`` if the target catalog has no biographical data. """ # Target catalog side first: # enable early return in case of no target data target_bio = data_gathering.gather_target_biodata(entity, catalog) if target_bio is None: return None deprecate, add = defaultdict(set), defaultdict(set) reference, wd_only = defaultdict(set), defaultdict(set) # Wikidata side if wd_cache is None: wd_bio = {} data_gathering.gather_target_ids( entity, catalog, target_database.get_catalog_pid(catalog, entity), wd_bio, ) data_gathering.gather_wikidata_biodata(wd_bio) else: wd_bio = wd_cache # Validation _validate(keys.BIODATA, wd_bio, target_bio, deprecate, add, reference, wd_only) return ( deprecate, _bio_statements_generator(add), _bio_statements_generator(reference), _bio_statements_generator(wd_only, for_catalogs=True), wd_bio, )
def _apply_url_blacklist(url_statements): LOGGER.info('Applying URL blacklist ...') initial_input_size = len(url_statements) blacklist = get_url_blacklist() # O(nm) complexity: n = len(blacklist); m = len(url_statements) # Expected order of magnitude: n = 10^2; m = 10^5 for domain in blacklist: # 10^2 url_statements = list( # Slurp the filter or it won't work filter(lambda stmt: domain not in stmt[2], url_statements) # 10^5 ) LOGGER.info( 'Filtered %.2f%% URLs', (initial_input_size - len(url_statements)) / initial_input_size * 100, ) return url_statements def _bio_statements_generator(stmts_dict, for_catalogs=False): for (qid, tid), values in stmts_dict.items(): for pid, value in values: if not for_catalogs: yield qid, pid, value, tid else: if match(constants.QID_REGEX, value): value = QID_PREFIX + value yield tid, PID_PREFIX + pid, value, QID_PREFIX + qid def _validate(criterion, wd, target_data, deprecate, add, reference, wd_only): LOGGER.info('Starting check against target %s ...', criterion) target = _prepare_target(target_data) # Large loop size: total Wikidata class instances with identifiers, # e.g., 80k musicians for qid, data in wd.items(): wd_data = data.get(criterion) # Skip when no Wikidata data if not wd_data: LOGGER.warning( 'Skipping check: no %s available in QID %s (Wikidata side)', criterion, qid, ) continue wd_tids = data[keys.TID] # 1 or tiny loop size: total target IDs per Wikidata item # (it should almost always be 1) for tid in wd_tids: if tid in target.keys(): target_data = target.get(tid) # Skip when no target data if not target_data: LOGGER.warning( 'Skipping check: no %s available in ' 'target ID %s (target side)', criterion, tid, ) continue shared_set, extra_set, wd_only_set = _compute_comparison_sets( criterion, wd_data, target_data ) if not shared_set: LOGGER.debug( 'No shared %s between %s and %s. The identifier ' 'statement should be deprecated', criterion, qid, tid, ) deprecate[tid].add(qid) else: LOGGER.debug( '%s and %s share these %s: %s', qid, tid, criterion, shared_set, ) reference[(qid, tid)].update(shared_set) if extra_set: LOGGER.debug( '%s has extra %s that should be added to %s: %s', tid, criterion, qid, extra_set, ) add[(qid, tid)].update(extra_set) else: LOGGER.debug('%s has no extra %s', tid, criterion) if wd_only_set: LOGGER.debug( '%s has %s not in %s: %s', qid, criterion, tid, wd_only_set, ) wd_only[(qid, tid)].update(wd_only_set) else: LOGGER.debug('%s has no extra %s', qid, criterion) LOGGER.info( 'Check against target %s completed: %d IDs to be deprecated, ' '%d Wikidata items with statements to be added, ' '%d Wikidata items with shared statements to be referenced, ' '%d values in Wikidata but not in the target catalog', criterion, len(deprecate), len(add), len(reference), len(wd_only), ) def _compute_comparison_sets(criterion, wd_data, target_data): if criterion == keys.LINKS: shared = wd_data.intersection(target_data) extra = target_data.difference(wd_data) wd_only = wd_data.difference(target_data) # Biographical validation requires more complex comparisons elif criterion == keys.BIODATA: # `wd_data` has either couples or triples: couples are dates wd_dates = set(filter(lambda x: len(x) == 2, wd_data)) # Don't cast to set: `wd_data` triples hold sets themselves wd_other = list(filter(lambda x: len(x) == 3, wd_data)) # In `target_data` we look for relevant date PIDs target_dates = set( filter( lambda x: x[0] in (vocabulary.DATE_OF_BIRTH, vocabulary.DATE_OF_DEATH), target_data, ) ) target_other = target_data.difference(target_dates) shared_dates, extra_dates = _compare('dates', wd_dates, target_dates) wd_only_dates = wd_dates.difference(shared_dates) shared_other, extra_other = _compare('other', wd_other, target_other) # `wd_other` has triples: build a set with couples # to directly compute the difference with `shared_other` wd_other_set = {(pid, qid) for pid, qid, _ in wd_other} wd_only_other = wd_other_set.difference(shared_other) shared = shared_dates | shared_other extra = extra_dates | extra_other wd_only = wd_only_dates | wd_only_other else: raise ValueError( f"Invalid validation criterion: '{criterion}'. " f"Please use either '{keys.LINKS}' or '{keys.BIODATA}'" ) return shared, extra, wd_only def _compare(what, wd, target): shared, extra = set(), set() # Keep track of matches to avoid useless computation # and incorrect comparisons: # this happens when WD has multiple claims with # the same property wd_matches, target_matches = [], [] # Filter missing target values (doesn't apply to dates) if what == 'other': target = filter(lambda x: x[1] is not None, target) for i, wd_elem in enumerate(wd): for j, t_elem in enumerate(target): # Don't compare when already matched if i in wd_matches or j in target_matches: continue # Don't compare different PIDs if wd_elem[0] != t_elem[0]: continue inputs = ( shared, extra, wd_matches, target_matches, i, wd_elem, j, t_elem, ) if what == 'dates': # Missing dates are unexpected: skip but warn if None in (wd_elem[1], t_elem[1]): LOGGER.warning( 'Skipping unexpected %s date pair with missing value(s)', (wd_elem, t_elem), ) continue _compare_dates(inputs) elif what == 'other': _compare_other(inputs) else: raise ValueError( f"Invalid argument: '{what}'. " "Please use either 'dates' or 'other'" ) return shared, extra def _compare_other(inputs): shared, extra, wd_matches, target_matches, i, wd_elem, j, t_elem = inputs pid, qid, wd_values = wd_elem _, t_value = t_elem # Take the lowercased normalized value # TODO improve matching _, t_normalized = text_utils.normalize(t_value) if t_normalized in wd_values: shared.add((pid, qid)) wd_matches.append(i) target_matches.append(j) else: t_qid = api_requests.resolve_qid(t_normalized) if t_qid is not None: extra.add((pid, t_qid)) def _compare_dates(inputs): shared, extra, wd_matches, target_matches, i, wd_elem, j, t_elem = inputs wd_timestamp, wd_precision = wd_elem[1].split('/') t_timestamp, t_precision = t_elem[1].split('/') shared_date, extra_date = _match_dates_by_precision( min(int(wd_precision), int(t_precision)), wd_elem, wd_timestamp, t_elem, t_timestamp, ) if shared_date is not None: shared.add(shared_date) wd_matches.append(i) target_matches.append(j) elif extra_date is not None: extra.add(extra_date) def _match_dates_by_precision(precision, wd_elem, wd_timestamp, t_elem, t_timestamp): slice_indices = { vocabulary.YEAR: 4, vocabulary.MONTH: 7, vocabulary.DAY: 10, } index = slice_indices.get(precision) if index is None: LOGGER.info( "Won't try to match date pair %s: too low or too high precision", (wd_elem, t_elem), ) return None, None shared, extra = None, None wd_simplified = wd_timestamp[:index] t_simplified = t_timestamp[:index] if wd_simplified == t_simplified: LOGGER.debug( 'Date pair %s matches on %s', (wd_timestamp, t_timestamp), (wd_simplified, t_simplified), ) # WD data has the priority shared = wd_elem else: LOGGER.debug('Target has an extra date: %s', t_timestamp) extra = t_elem return shared, extra def _dump_deprecated(data, outpath): if data: with open(outpath, 'w') as deprecated: json.dump( {target_id: list(qids) for target_id, qids in data.items()}, deprecated, indent=2, ) LOGGER.info('IDs to be deprecated dumped to %s', outpath) else: LOGGER.info("No IDs to be deprecated, won't dump to file") def _dump_csv_output(data, out_path, log_msg_subject): if data: with open(out_path, 'w') as fout: writer = csv.writer(fout) writer.writerows(data) LOGGER.info('%s dumped to %s', log_msg_subject, out_path) else: LOGGER.info("No %s, won't dump to file", log_msg_subject) def _prepare_target(dataset): target = defaultdict(set) for identifier, *data in dataset: if len(data) == 1: # Links target[identifier].add(data.pop()) else: # Biographical data target[identifier].add(tuple(data)) return target