Source code for soweego.importer.imdb_dump_extractor

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""`IMDb <https://www.imdb.com/>`_ dump extractor."""

__author__ = 'Andrea Tupini'
__email__ = 'tupini07@gmail.com'
__version__ = '1.0'
__license__ = 'GPL-3.0'
__copyright__ = 'Copyleft 2019, tupini07'

import copy
import csv
import datetime
import gzip
import logging
from typing import Dict, Generator, List, Tuple

from tqdm import tqdm

from soweego.commons import text_utils
from soweego.commons.db_manager import DBManager
from soweego.importer.base_dump_extractor import BaseDumpExtractor
from soweego.importer.models import imdb_entity
from soweego.wikidata import vocabulary as vocab

LOGGER = logging.getLogger(__name__)

DUMP_URL_PERSON_INFO = 'https://datasets.imdbws.com/name.basics.tsv.gz'
DUMP_URL_MOVIE_INFO = 'https://datasets.imdbws.com/title.basics.tsv.gz'


[docs]class IMDbDumpExtractor(BaseDumpExtractor): """Download IMDb dumps, extract data, and populate a database instance. """ # Counters n_actors = 0 n_directors = 0 n_movies = 0 n_musicians = 0 n_persons = 0 n_producers = 0 n_writers = 0 n_misc = 0 n_person_movie_links = 0 _sqlalchemy_commit_every = 100_000
[docs] def get_dump_download_urls(self) -> List[str]: return [DUMP_URL_PERSON_INFO, DUMP_URL_MOVIE_INFO]
@staticmethod def _normalize_null(entity: Dict) -> None: """IMDb represents a null entry with \\N , this method converts all \\N to None so that they're saved as null in the database. This is done for all 'entries' of a given entity. The normalization process is done *in place*, so this method has no return value. :param entity: represents the entity we want to *normalize* """ for key, value in entity.items(): if value == '\\N': entity[key] = None
[docs] def extract_and_populate(self, dump_file_paths: List[str], resolve: bool) -> None: """Extract relevant data from the *name* (people) and *title* (works) IMDb dumps, preprocess them, populate `SQLAlchemy <https://www.sqlalchemy.org/>`_ ORM entities, and persist them to a database instance. See :mod:`~soweego.importer.models.imdb_entity` for the ORM definitions. :param dump_file_paths: paths to downloaded catalog dumps :param resolve: whether to resolve URLs found in catalog dumps or not """ # the order of these files is specified in `self.get_dump_download_urls` person_file_path = dump_file_paths[0] movies_file_path = dump_file_paths[1] LOGGER.debug('Path to movie info dump: %s', movies_file_path) LOGGER.debug('Path to person info dump: %s', person_file_path) start = datetime.datetime.now() tables = [ imdb_entity.IMDbActorEntity, imdb_entity.IMDbDirectorEntity, imdb_entity.IMDbTitleEntity, imdb_entity.IMDbMusicianEntity, imdb_entity.IMDbProducerEntity, imdb_entity.IMDbWriterEntity, imdb_entity.IMDbTitleNameRelationship, ] db_manager = DBManager() LOGGER.info('Connected to database: %s', db_manager.get_engine().url) db_manager.drop(tables) db_manager.create(tables) LOGGER.info( 'SQL tables dropped and re-created: %s', [table.__tablename__ for table in tables], ) LOGGER.info('Starting import of movies ...') # Here we open the movie dump file, and add everything to the DB for movie_info, entity_array in self._loop_through_entities(movies_file_path): # create the movie SQLAlchemy entity and populate it movie_entity = imdb_entity.IMDbTitleEntity() movie_entity.catalog_id = movie_info.get('tconst') movie_entity.title_type = movie_info.get('titleType') if movie_info.get('primaryTitle') is not None: movie_entity.name = movie_info.get('primaryTitle') movie_entity.name_tokens = ' '.join( text_utils.tokenize(movie_info.get('primaryTitle')) ) movie_entity.is_adult = True if movie_info.get('isAdult') == '1' else False try: movie_entity.born = datetime.date( year=int(movie_info.get('startYear')), month=1, day=1 ) movie_entity.born_precision = 9 except (KeyError, TypeError): LOGGER.debug('No start year value for %s', movie_entity) try: movie_entity.died = datetime.date( year=int(movie_info.get('endYear')), month=1, day=1 ) movie_entity.died_precision = 9 except (KeyError, TypeError): LOGGER.debug('No end year value for %s', movie_entity) movie_entity.runtime_minutes = movie_info.get('runtimeMinutes') if movie_info.get('genres'): # if movie has a genre specified movie_entity.genres = ' '.join( text_utils.tokenize(movie_info.get('genres')) ) # Creates entity for alias alias = movie_info.get('originalTitle') if alias is not None and movie_entity.name != alias: alias_entity = copy.deepcopy(movie_entity) alias_entity.name = alias alias_entity.name_tokens = ' '.join(text_utils.tokenize(alias)) entity_array.append(alias_entity) entity_array.append(movie_entity) self.n_movies += 1 # mark end for movie import process end = datetime.datetime.now() LOGGER.info( 'Movie import completed in %s. ' 'Total movies imported: %d', end - start, self.n_movies, ) LOGGER.info('Starting import of people ...') # reset timer for persons import start = datetime.datetime.now() for person_info, entity_array in self._loop_through_entities(person_file_path): # IMDb saves the list of professions as a comma separated # string professions = person_info.get('primaryProfession') # if person has no professions then ignore it if not professions: LOGGER.debug('Person %s has no professions', person_info.get('nconst')) continue professions = professions.split(',') # each person can be added to multiple tables in the DB, # each table stands for one of the main professions types_of_entities = [] if 'actor' in professions or 'actress' in professions: self.n_actors += 1 types_of_entities.append(imdb_entity.IMDbActorEntity()) if 'director' in professions: self.n_directors += 1 types_of_entities.append(imdb_entity.IMDbDirectorEntity()) if 'producer' in professions: self.n_producers += 1 types_of_entities.append(imdb_entity.IMDbProducerEntity()) if any( prof in [ 'sound_department', 'composer', 'music_department', 'soundtrack', ] for prof in professions ): self.n_musicians += 1 types_of_entities.append(imdb_entity.IMDbMusicianEntity()) if 'writer' in professions: self.n_writers += 1 types_of_entities.append(imdb_entity.IMDbWriterEntity()) # if the only profession a person has is `miscellaneous` then we # add it to all tables if professions == ['miscellaneous']: self.n_misc += 1 types_of_entities = [ imdb_entity.IMDbActorEntity(), imdb_entity.IMDbDirectorEntity(), imdb_entity.IMDbMusicianEntity(), imdb_entity.IMDbProducerEntity(), imdb_entity.IMDbWriterEntity(), ] # add person to every matching table for etype in types_of_entities: self._populate_person(etype, person_info, entity_array) # if person is known for any movies then add these to the # database as well if person_info.get('knownForTitles'): self.n_person_movie_links += 1 self._populate_person_movie_relations(person_info, entity_array) self.n_persons += 1 # mark the end time for the person import process end = datetime.datetime.now() LOGGER.info( 'Person import completed in %s. ' 'Total people imported: %d - ' 'Actors: %d - Directors: %d - Musicians: %d - ' 'Producers: %d - Writers: %d - Misc: %d', end - start, self.n_persons, self.n_actors, self.n_directors, self.n_musicians, self.n_producers, self.n_writers, self.n_misc, )
def _loop_through_entities( self, file_path: str ) -> Generator[Tuple[Dict, List], None, None]: """ Generator that given an IMDb dump file (which should be ".tsv.gz" format) it loops through every entry and yields it. :return: a generator which yields a Tuple[entity_info, entity_array] the consumer of this generator will take `entity_info`, create an SQLAlchemy entity, and append this to the `entity_array` """ db_manager = DBManager() with gzip.open(file_path, 'rt') as ddump: session = db_manager.new_session() # count number of rows for TQDM, so we can display how # much is missing to complete the process. Then go back # to the start of the file with `.seek(0)` n_rows = sum(1 for line in ddump) ddump.seek(0) entity_array = [] LOGGER.debug('Dump "%s" has %d entries', file_path, n_rows) reader = csv.DictReader(ddump, delimiter='\t') # for every entry in the file.. for entity_info in tqdm(reader, total=n_rows): # clean the entry self._normalize_null(entity_info) # yield the cleaned dict yield entity_info, entity_array # every `_sqlalchemy_commit_every` loops we commit the # session to the DB. This is more efficient than commiting # every loop, and is not so hard on the memory requirements # as would be adding everything to session and commiting once # the for loop is done if len(entity_array) >= self._sqlalchemy_commit_every: LOGGER.info( 'Adding batch of entities to the database, ' 'this will take a while. Progress will resume soon.' ) insert_start_time = datetime.datetime.now() session.bulk_save_objects(entity_array) session.commit() session.expunge_all() # clear session entity_array.clear() # clear entity array LOGGER.debug( 'It took %s to add %s entities to the database', datetime.datetime.now() - insert_start_time, len(entity_array), ) # commit remaining entities session.bulk_save_objects(entity_array) session.commit() # clear list reference since it might still be available in # the scope where this generator was used. entity_array.clear() def _populate_person( self, person_entity: imdb_entity.IMDbNameEntity, person_info: Dict, entity_array: object, ) -> None: """ Given an instance of :ref:`soweego.importer.models.imdb_entity.IMDbNameEntity` this function populates its attributes according to the provided `person_info` dictionary. It then adds said instance to the SQLAlchemy session. :param person_entity: the entity which we want to populate :param person_info: the data we want to populate the entity with :param entity_array: an external array to which we'll add the entity once it is populated. """ person_entity.catalog_id = person_info.get('nconst') person_entity.name = person_info.get('primaryName') person_entity.name_tokens = ' '.join(text_utils.tokenize(person_entity.name)) # If either `actor` or `actress` in primary profession # (which is a comma separated string of professions) # then we can distinguish the gender if any( prof in person_info.get('primaryProfession') for prof in ['actor', 'actress'] ): person_entity.gender = ( 'male' if 'actor' in person_info.get('primaryProfession') else 'female' ) # IMDb only provides us with the birth and death year of # a person, so this is the only one we'll take into # account. Month and Day are set by default to 1. The # base `IMDbNameEntity` defines a precision of 9 for the # birth and death dates, which (according to # `vocab.DATE_PRECISION`) means that only the year is correct. born_year = person_info.get('birthYear') if born_year: # datetime.date(year, month, day) person_entity.born = datetime.date(int(born_year), 1, 1) death_year = person_info.get('deathYear') if death_year: person_entity.died = datetime.date(int(death_year), 1, 1) # The array of primary professions gets translated to a list # of the QIDs that represent said professions in Wikidata if person_info.get('primaryProfession'): # get QIDs of occupations for person translated_occupations = self._translate_professions( person_info.get('primaryProfession').split(',') ) # only save those occupations which are not the main # occupation of the entity type (ie, for ActorEntity # don't include 'actor' occupation since it is implicit) person_entity.occupations = ' '.join( occ for occ in translated_occupations if occ != person_entity.table_occupation ) entity_array.append(person_entity) @staticmethod def _populate_person_movie_relations( person_info: Dict, entity_array: object ) -> None: """ Given a `person_info` we extract the ID that the person has in IMDB and the IDs of the movies for which this person is known (which also come from IMDB). We add a :ref:`soweego.importer.models.imdb_entity.ImdbPersonMovieRelationship` entity to the session for each relation. :param person_info: dictionary that contains the IMDB person ID and the IMDB movie IDs (for movies the specific person is known). The movie IDs are a comma separated string :param entity_array: an external array to which we'll add the person-movie relations. """ know_for_titles = person_info.get('knownForTitles').split(',') for title in know_for_titles: entity_array.append( imdb_entity.IMDbTitleNameRelationship( from_catalog_id=title, to_catalog_id=person_info.get('nconst'), ) ) @staticmethod def _translate_professions(professions: List[str]) -> List[str]: """ Gets the list of professions (as a list of strings) directly from IMDb and translates these to a list of Wikidata QIDs for each specific profession. Unmappable professions (like `miscellaneous` are removed) The actual QIDs and the dictionary where this mapping is encoded can both be found in :ref:`soweego.wikidata.vocabulary` :param professions: list of profession names, given by IMDB :return: list of QIDs for said professions """ qids = [] for prof in professions: qid = vocab.IMDB_PROFESSIONS_MAPPING.get(prof, None) if qid: qids.append(qid) return qids