Source code for soweego.importer.discogs_dump_extractor
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""`Discogs <https://www.discogs.com/>`_ dump extractor."""
__author__ = 'Marco Fossati'
__email__ = 'fossati@spaziodati.eu'
__version__ = '1.0'
__license__ = 'GPL-3.0'
__copyright__ = 'Copyleft 2018, Hjfocs'
import gzip
import logging
import os
import shutil
import xml.etree.ElementTree as et
from datetime import date, datetime
from typing import Iterable, List, Optional, Tuple
from lxml import etree
from requests import get
from tqdm import tqdm
from soweego.commons import text_utils, url_utils
from soweego.commons.db_manager import DBManager
from soweego.importer.base_dump_extractor import BaseDumpExtractor
from soweego.importer.models.base_link_entity import BaseLinkEntity
from soweego.importer.models.discogs_entity import (
DiscogsArtistEntity, DiscogsGroupEntity, DiscogsGroupLinkEntity,
DiscogsGroupNlpEntity, DiscogsMasterArtistRelationship, DiscogsMasterEntity,
DiscogsMusicianEntity, DiscogsMusicianLinkEntity, DiscogsMusicianNlpEntity
)
LOGGER = logging.getLogger(__name__)
DUMP_BASE_URL = 'https://discogs-data.s3-us-west-2.amazonaws.com/'
DUMP_LIST_URL_TEMPLATE = DUMP_BASE_URL + '?delimiter=/&prefix=data/{}/'
[docs]class DiscogsDumpExtractor(BaseDumpExtractor):
"""Download Discogs dumps, extract data, and
populate a database instance."""
# Counters
total_entities = 0
musicians = 0
musician_links = 0
musician_nlp = 0
bands = 0
band_links = 0
band_nlp = 0
valid_links = 0
dead_links = 0
_sqlalchemy_commit_every = 100_000
[docs] def get_dump_download_urls(self) -> Optional[List[str]]:
urls = []
response = get(DUMP_LIST_URL_TEMPLATE.format(date.today().year))
root = et.fromstring(response.text)
# 4 dump files, sorted alphabetically: artists, labels, masters,
# releases
dumps = [
list(root)[-4],
list(root)[-2],
] # Take the 2nd and 4th from last child
for dump in dumps:
for child in dump:
if 'Key' in child.tag:
urls.append(DUMP_BASE_URL + child.text)
break
if not urls:
LOGGER.error(
'Failed to get the Discogs dump download URL: are we at the '
'very start of the year?'
)
return None
return urls
[docs] def extract_and_populate(self, dump_file_paths: List[str], resolve: bool) -> None:
"""Extract relevant data from the *artists* (people)
and *masters* (works) Discogs dumps, preprocess them, populate
`SQLAlchemy <https://www.sqlalchemy.org/>`_ ORM entities, and persist
them to a database instance.
See :mod:`~soweego.importer.models.discogs_entity`
for the ORM definitions.
:param dump_file_paths: paths to downloaded catalog dumps
:param resolve: whether to resolve URLs found in catalog dumps or not
"""
self._process_artists_dump(dump_file_paths[0], resolve)
self._process_masters_dump(dump_file_paths[1])
def _process_masters_dump(self, dump_file_path):
LOGGER.info("Starting import of masters from Discogs dump '%s'", dump_file_path)
start = datetime.now()
tables = [DiscogsMasterEntity, DiscogsMasterArtistRelationship]
db_manager = DBManager()
LOGGER.info('Connected to database: %s', db_manager.get_engine().url)
db_manager.drop(tables)
db_manager.create(tables)
LOGGER.info(
'SQL tables dropped and re-created: %s',
[table.__tablename__ for table in tables],
)
extracted_path = '.'.join(dump_file_path.split('.')[:-1])
# Extract dump file if it has not yet been extracted
if not os.path.exists(extracted_path):
LOGGER.info('Extracting dump file')
with gzip.open(dump_file_path, 'rb') as f_in:
with open(extracted_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# count number of entries
n_rows = sum(1 for _ in self._g_process_et_items(extracted_path, 'master'))
session = db_manager.new_session()
entity_array = [] # array to which we'll add the entities
relationships_set = set()
self.total_entities = 0
for _, node in tqdm(
self._g_process_et_items(extracted_path, 'master'), total=n_rows
):
if not node.tag == 'master':
continue
self.total_entities += 1
entity = self._extract_from_master_node(node, relationships_set)
entity_array.append(entity)
# commit in batches of `self._sqlalchemy_commit_every`
if len(entity_array) >= self._sqlalchemy_commit_every:
LOGGER.info(
'Adding batch of entities to the database, '
'this will take a while.'
'Progress will resume soon.'
)
insert_start_time = datetime.now()
session.bulk_save_objects(entity_array)
session.commit()
session.expunge_all() # clear session
entity_array.clear() # clear entity array
LOGGER.debug(
'It took %s to add %s entities to the database',
datetime.now() - insert_start_time,
self._sqlalchemy_commit_every,
)
# finally commit remaining entities in session
# (if any), and close session
session.bulk_save_objects(entity_array)
session.bulk_save_objects(
[
DiscogsMasterArtistRelationship(id1, id2)
for id1, id2 in relationships_set
]
)
session.commit()
session.close()
end = datetime.now()
LOGGER.info(
'Import completed in %s. Total entities: %d. ' 'Total relationships %s.',
end - start,
self.total_entities,
len(relationships_set),
)
# once the import process is complete,
# we can safely delete the extracted discogs dump
os.remove(extracted_path)
@staticmethod
def _extract_from_master_node(node, relationships_set):
entity = DiscogsMasterEntity()
entity.catalog_id = node.attrib['id']
genres = set()
for child in node:
if child.tag == 'main_release':
entity.main_release_id = child.text
elif child.tag == 'genres':
for genre in child:
genres.update(text_utils.tokenize(genre.text))
elif child.tag == 'styles':
for style in child:
genres.update(text_utils.tokenize(style.text))
elif child.tag == 'title':
entity.name = child.text
entity.name_tokens = ' '.join(text_utils.tokenize(child.text))
elif child.tag == 'data_quality':
entity.data_quality = child.text.lower()
elif child.tag == 'year':
try:
entity.born = date(year=int(child.text), month=1, day=1)
entity.born_precision = 9
except ValueError:
LOGGER.debug(
'Master with id %s has an invalid year: %s',
entity.catalog_id,
child.text,
)
elif child.tag == 'artists':
for artist in child:
relationships_set.add((entity.catalog_id, artist.find('id').text))
entity.genres = ' '.join(genres)
return entity
def _extract_from_artist_node(self, node, resolve: bool) -> dict:
infos = {}
# Skip nodes without required fields
identifier = node.findtext('id')
if not identifier:
LOGGER.warning(
'Skipping import for artist node with no identifier: %s', node
)
return None
name = node.findtext('name')
if not name:
LOGGER.warning(
'Skipping import for identifier with no name: %s', identifier
)
return None
infos['identifier'] = identifier
infos['name'] = name
# Musician
groups = node.find('groups')
members = node.find('members')
if groups is not None:
infos['groups'] = groups
if members is not None:
infos['members'] = members
infos['realname'] = node.findtext('realname')
infos['data_quality'] = node.findtext('data_quality')
infos['profile'] = node.findtext('profile')
infos['namevariations'] = node.find('namevariations')
infos['living_links'] = self._extract_living_links(identifier, node, resolve)
return infos
def _process_artists_dump(self, dump_file_path, resolve):
LOGGER.info(
"Starting import of musicians and bands from Discogs dump '%s'",
dump_file_path,
)
start = datetime.now()
tables = [
DiscogsMusicianEntity,
DiscogsMusicianNlpEntity,
DiscogsMusicianLinkEntity,
DiscogsGroupEntity,
DiscogsGroupNlpEntity,
DiscogsGroupLinkEntity,
]
db_manager = DBManager()
LOGGER.info('Connected to database: %s', db_manager.get_engine().url)
db_manager.drop(tables)
db_manager.create(tables)
LOGGER.info(
'SQL tables dropped and re-created: %s',
[table.__tablename__ for table in tables],
)
extracted_path = '.'.join(dump_file_path.split('.')[:-1])
# Extract dump file if it has not yet been extracted
if not os.path.exists(extracted_path):
LOGGER.info('Extracting dump file')
with gzip.open(dump_file_path, 'rb') as f_in:
with open(extracted_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# count number of entries
n_rows = sum(1 for _ in self._g_process_et_items(extracted_path, 'artist'))
session = db_manager.new_session()
entity_array = [] # array to which we'll add the entities
for _, node in tqdm(
self._g_process_et_items(extracted_path, 'artist'), total=n_rows
):
if not node.tag == 'artist':
continue
infos = self._extract_from_artist_node(node, resolve)
if infos is None:
continue
if 'groups' in infos:
entity = DiscogsMusicianEntity()
self._populate_musician(entity_array, entity, infos)
# Band
elif 'members' in infos:
entity = DiscogsGroupEntity()
self._populate_band(entity_array, entity, infos)
# commit in batches of `self._sqlalchemy_commit_every`
if len(entity_array) >= self._sqlalchemy_commit_every:
LOGGER.info(
'Adding batch of entities to the database, '
'this will take a while. '
'Progress will resume soon.'
)
insert_start_time = datetime.now()
session.bulk_save_objects(entity_array)
session.commit()
session.expunge_all() # clear session
entity_array.clear() # clear entity array
LOGGER.debug(
'It took %s to add %s entities to the database',
datetime.now() - insert_start_time,
self._sqlalchemy_commit_every,
)
# finally commit remaining entities in session
# (if any), and close session
session.bulk_save_objects(entity_array)
session.commit()
session.close()
end = datetime.now()
LOGGER.info(
'Import completed in %s. '
'Total entities: %d - %d musicians with %d links - %d bands'
' with %d links - %d discarded dead links.',
end - start,
self.total_entities,
self.musicians,
self.musician_links,
self.bands,
self.band_links,
self.dead_links,
)
# once the import process is complete,
# we can safely delete the extracted discogs dump
os.remove(extracted_path)
def _populate_band(self, entity_array, entity: DiscogsGroupEntity, infos: dict):
# Main entity
self._fill_entity(entity, infos)
self.bands += 1
self.total_entities += 1
# Textual data
self._populate_nlp_entity(entity_array, infos, DiscogsGroupNlpEntity)
# Denormalized name variations
self._populate_name_variations(entity_array, infos, entity)
# Links
self._populate_links(entity_array, DiscogsGroupLinkEntity, infos)
entity_array.append(entity)
# TODO populate group -> musicians relationship table
# for member in list(members):
# get member.attrib['id']
def _populate_musician(
self, entity_array, entity: DiscogsArtistEntity, infos: dict
):
# Main entity
self._fill_entity(entity, infos)
self.musicians += 1
self.total_entities += 1
# Textual data
self._populate_nlp_entity(entity_array, infos, DiscogsMusicianNlpEntity)
# Denormalized name variations
self._populate_name_variations(entity_array, infos, entity)
# Links
self._populate_links(entity_array, DiscogsMusicianLinkEntity, infos)
entity_array.append(entity)
# TODO populate musician -> groups relationship table
# for group in list(groups):
# get group.attrib['id']
def _populate_links(self, entity_array, entity_class, infos: dict):
for link in infos['living_links']:
link_entity = entity_class()
self._fill_link_entity(link_entity, infos['identifier'], link)
entity_array.append(link_entity)
def _populate_name_variations(self, entity_array, infos: dict, current_entity):
identifier = infos['identifier']
if infos.get('namevariations') is not None:
children = list(infos['namevariations'])
if children:
for entity in self._denormalize_name_variation_entities(
current_entity, children
):
entity_array.append(entity)
else:
LOGGER.debug('Artist %s has an empty <namevariations/> tag', identifier)
else:
LOGGER.debug('Artist %s has no <namevariations> tag', identifier)
def _populate_nlp_entity(self, entity_array, infos: dict, entity_class):
if infos.get('profile'):
nlp_entity = entity_class()
nlp_entity.catalog_id = infos['identifier']
nlp_entity.description = infos['profile']
description_tokens = text_utils.tokenize(infos['profile'])
if description_tokens:
nlp_entity.description_tokens = ' '.join(description_tokens)
entity_array.append(nlp_entity)
self.total_entities += 1
if 'Musician' in entity_class.__name__:
self.musician_nlp += 1
else:
self.band_nlp += 1
else:
LOGGER.debug('Artist %s has an empty <profile/> tag', infos['identifier'])
@staticmethod
def _fill_entity(entity: DiscogsArtistEntity, infos):
# Base fields
entity.catalog_id = infos['identifier']
entity.name = infos['name']
name_tokens = text_utils.tokenize(infos['name'])
if name_tokens:
entity.name_tokens = ' '.join(name_tokens)
# Real name
real_name = infos['realname']
if real_name:
entity.real_name = real_name
else:
LOGGER.debug('Artist %s has an empty <realname/> tag', infos['identifier'])
# Data quality
data_quality = infos['data_quality']
if data_quality:
entity.data_quality = data_quality
else:
LOGGER.debug(
'Artist %s has an empty <data_quality/> tag',
infos['identifier'],
)
def _denormalize_name_variation_entities(
self, main_entity: DiscogsArtistEntity, name_variation_nodes
):
entity_class = type(main_entity)
for node in name_variation_nodes:
name_variation = node.text
if not name_variation:
LOGGER.debug(
'Artist %s: skipping empty <name> tag in <namevariations>',
main_entity.catalog_id,
)
continue
variation_entity = entity_class()
variation_entity.catalog_id = main_entity.catalog_id
variation_entity.name = name_variation
name_tokens = text_utils.tokenize(name_variation)
if name_tokens:
variation_entity.name_tokens = ' '.join(name_tokens)
variation_entity.real_name = main_entity.real_name
variation_entity.data_quality = main_entity.data_quality
self.total_entities += 1
if 'Musician' in entity_class.__name__:
self.musicians += 1
else:
self.bands += 1
yield variation_entity
def _extract_living_links(self, identifier, node, resolve: bool):
LOGGER.debug('Extracting living links from artist %s', identifier)
urls = node.find('urls')
if urls is not None:
for url_element in urls.iterfind('url'):
url = url_element.text
if not url:
LOGGER.debug('Artist %s: skipping empty <url> tag', identifier)
continue
for alive_link in self._check_link(url, resolve):
yield alive_link
def _check_link(self, link, resolve: bool):
LOGGER.debug('Processing link <%s>', link)
clean_parts = url_utils.clean(link)
LOGGER.debug('Clean link: %s', clean_parts)
for part in clean_parts:
valid = url_utils.validate(part)
if not valid:
self.dead_links += 1
continue
LOGGER.debug('Valid URL: <%s>', valid)
if not resolve:
yield valid
continue
alive = url_utils.resolve(valid)
if not alive:
self.dead_links += 1
continue
LOGGER.debug('Living URL: <%s>', alive)
self.valid_links += 1
yield alive
def _fill_link_entity(self, entity: BaseLinkEntity, identifier, url):
entity.catalog_id = identifier
entity.url = url
entity.is_wiki = url_utils.is_wiki_link(url)
entity.url_tokens = ' '.join(url_utils.tokenize(url))
if isinstance(entity, DiscogsMusicianLinkEntity):
self.musician_links += 1
elif isinstance(entity, DiscogsGroupLinkEntity):
self.band_links += 1
@staticmethod
def _g_process_et_items(path, tag) -> Iterable[Tuple]:
"""
Generator: Processes ElementTree items in a memory
efficient way
"""
context: etree.ElementTree = etree.iterparse(path, events=('end',), tag=tag)
for event, elem in context:
yield event, elem
# delete content of node once we're done processing
# it. If we don't then it would stay in memory
elem.clear()