Source code for soweego.importer.musicbrainz_dump_extractor
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""`MusicBrainz <https://musicbrainz.org/>`_ dump extractor."""
__author__ = 'Massimo Frasson'
__email__ = 'maxfrax@gmail.com'
__version__ = '1.0'
__license__ = 'GPL-3.0'
__copyright__ = 'Copyleft 2018, MaxFrax96'
import logging
import os
import shutil
import tarfile
from collections import defaultdict
from csv import DictReader
from datetime import date, datetime
from typing import List, Tuple
import requests
from sqlalchemy.exc import IntegrityError
from tqdm import tqdm
from soweego.commons import text_utils, url_utils
from soweego.commons.db_manager import DBManager
from soweego.commons.utils import count_num_lines_in_file
from soweego.importer.base_dump_extractor import BaseDumpExtractor
from soweego.importer.models.base_entity import BaseEntity
from soweego.importer.models.musicbrainz_entity import (
MusicBrainzArtistBandRelationship, MusicBrainzArtistEntity,
MusicBrainzArtistLinkEntity, MusicBrainzBandEntity, MusicBrainzBandLinkEntity,
MusicBrainzReleaseGroupArtistRelationship, MusicBrainzReleaseGroupEntity,
MusicBrainzReleaseGroupLinkEntity
)
from soweego.wikidata.sparql_queries import external_id_pids_and_urls
LOGGER = logging.getLogger(__name__)
[docs]class MusicBrainzDumpExtractor(BaseDumpExtractor):
"""Download MusicBrainz dumps, extract data, and
populate a database instance."""
_sqlalchemy_commit_every = 100_000
[docs] def get_dump_download_urls(self) -> List[str]:
base_url = 'http://ftp.musicbrainz.org/pub/musicbrainz/data/fullexport'
latest_version = requests.get(f'{base_url}/LATEST').text.rstrip()
return [f'{base_url}/{latest_version}/mbdump.tar.bz2']
[docs] def extract_and_populate(self, dump_file_paths: List[str], resolve: bool):
"""Extract relevant data from the *artist* (people) and *release group*
(works) MusicBrainz dumps, preprocess them, populate
`SQLAlchemy <https://www.sqlalchemy.org/>`_ ORM entities, and persist
them to a database instance.
See :mod:`~soweego.importer.models.musicbrainz_entity`
for the ORM definitions.
:param dump_file_paths: paths to downloaded catalog dumps
:param resolve: whether to resolve URLs found in catalog dumps or not
"""
dump_file_path = dump_file_paths[0]
dump_path = os.path.join(
os.path.dirname(os.path.abspath(dump_file_path)),
f"{os.path.basename(dump_file_path)}_extracted",
)
if not os.path.isdir(dump_path):
with tarfile.open(dump_file_path, "r:bz2") as tar:
LOGGER.info("Extracting dump %s in %s", dump_file_path, dump_path)
tar.extractall(dump_path)
LOGGER.info("Extracted dump %s in %s", dump_file_path, dump_path)
db_manager = DBManager()
tables = [MusicBrainzReleaseGroupEntity]
db_manager.drop(tables)
db_manager.create(tables)
LOGGER.info(
"Dropped and created tables %s",
[table.__tablename__ for table in tables],
)
LOGGER.info("Importing release groups")
release_groups_count = self._add_entities_from_generator(
db_manager, self._release_group_generator, dump_path
)
LOGGER.debug("Added %s/%s release group records", *release_groups_count)
def release_artist_relationships_uniqueness_filter():
"""Remove duplicates from
_release_group_artist_relationship_generator"""
yield from [
MusicBrainzReleaseGroupArtistRelationship(item[0], item[1])
for item in set(
self._release_group_artist_relationship_generator(dump_path)
)
]
tables = [MusicBrainzReleaseGroupArtistRelationship]
db_manager.drop(tables)
db_manager.create(tables)
LOGGER.info(
"Dropped and created tables %s",
[table.__tablename__ for table in tables],
)
LOGGER.info("Importing relationships release-artist/band")
relationships_count = self._add_entities_from_generator(
db_manager, release_artist_relationships_uniqueness_filter
)
LOGGER.debug("Added %s/%s relationships records", *relationships_count)
tables = [MusicBrainzReleaseGroupLinkEntity]
db_manager.drop(tables)
db_manager.create(tables)
LOGGER.info(
"Dropped and created tables %s",
[table.__tablename__ for table in tables],
)
LOGGER.info("Importing release groups links")
link_count = self._add_entities_from_generator(
db_manager, self._release_group_link_generator, dump_path, resolve
)
LOGGER.debug("Added %s/%s release group link records", *link_count)
tables = [MusicBrainzArtistEntity, MusicBrainzBandEntity]
db_manager.drop(tables)
db_manager.create(tables)
LOGGER.info(
"Dropped and created tables %s",
[table.__tablename__ for table in tables],
)
LOGGER.info("Importing artists and bands")
artist_count = self._add_entities_from_generator(
db_manager, self._artist_generator, dump_path
)
LOGGER.debug("Added %s/%s artist records", *artist_count)
tables = [MusicBrainzArtistLinkEntity, MusicBrainzBandLinkEntity]
db_manager.drop(tables)
db_manager.create(tables)
LOGGER.info(
"Dropped and created tables %s",
[table.__tablename__ for table in tables],
)
LOGGER.info("Importing links")
link_count = self._add_entities_from_generator(
db_manager, self._artist_link_generator, dump_path, resolve
)
LOGGER.debug("Added %s/%s link records", *link_count)
LOGGER.info("Importing ISNIs")
isni_link_count = self._add_entities_from_generator(
db_manager, self._isni_link_generator, dump_path, resolve
)
LOGGER.debug("Added %s/%s ISNI link records", *isni_link_count)
tables = [MusicBrainzArtistBandRelationship]
db_manager.drop(tables)
db_manager.create(tables)
LOGGER.info(
"Dropped and created tables %s",
[table.__tablename__ for table in tables],
)
LOGGER.info("Importing relationships artist-band")
def artist_band_relationships_uniqueness_filter():
yield from [
MusicBrainzArtistBandRelationship(item[0], item[1])
for item in set(self._artist_band_relationship_generator(dump_path))
]
relationships_count = self._add_entities_from_generator(
db_manager, artist_band_relationships_uniqueness_filter
)
LOGGER.debug("Added %s/%s relationships records", *relationships_count)
shutil.rmtree(dump_path, ignore_errors=True)
def _add_entities_from_generator(
self, db_manager, generator_, *args
) -> Tuple[int, int]:
"""
Adds all entities yielded by a generator to the DB
:return: (the total number of entities yielded,
the number of entities added to the DB)
"""
# we keep track of both the total number of entities added
# to the database and the total number of entities the
# generator yields. If everything goes ok then these 2
# numbers should be the same
n_total_entities = 0
n_added_entities = 0
session = db_manager.new_session()
entity_array = [] # array to which we'll add the entities
# the generator will give us a new entity each loop
# so we just add this to the `entity_array` and commit
# it once it is large enough (self._sqlalchemy_commit_every)
for entity in generator_(*args):
try:
n_total_entities += 1
entity_array.append(entity)
# commit entities to DB in batches, it is mode
# efficient
if len(entity_array) >= self._sqlalchemy_commit_every:
LOGGER.info(
"Adding batch of entities to the database, "
"this will take a while. Progress will "
"resume soon."
)
insert_start_time = datetime.now()
session.bulk_save_objects(entity_array)
session.commit()
session.expunge_all() # clear session
entity_array.clear() # clear entity array
LOGGER.debug(
"It took %s to add %s entities to the database",
datetime.now() - insert_start_time,
len(entity_array),
)
n_added_entities += 1
except IntegrityError as i:
LOGGER.warning(str(i))
# finally, commit remaining entities in session
session.bulk_save_objects(entity_array)
session.commit()
# and close session
session.close()
return n_total_entities, n_added_entities
@staticmethod
def _get_urls_for_entity_id(dump_path: str, l_path: str, resolve: bool) -> dict:
"""given a l_{something}_url relationship file, return a dict of
somethingid-[urls]"""
LOGGER.info(f"Loading %s relationships", l_path)
urlid_entityid_relationship = {}
with open(l_path, "r") as tsvfile:
url_relationships = DictReader(
tsvfile, delimiter='\t', fieldnames=[i for i in range(0, 6)]
)
for relationship in tqdm(
url_relationships, total=count_num_lines_in_file(tsvfile)
):
# url id matched with its user id
if relationship[3] in urlid_entityid_relationship:
LOGGER.warning(
'Url with ID %s has multiple entities, only one will '
'be stored',
relationship[3],
)
else:
urlid_entityid_relationship[relationship[3]] = relationship[2]
url_path = os.path.join(dump_path, 'mbdump', 'url')
url_entityid = {}
LOGGER.info('Checking URLs related to entity')
# Translates URL IDs to the relative URL
with open(url_path, "r") as tsvfile:
urls = DictReader(
tsvfile, delimiter='\t', fieldnames=[i for i in range(0, 5)]
)
for url_record in tqdm(urls, total=count_num_lines_in_file(tsvfile)):
urlid = url_record[0]
if urlid in urlid_entityid_relationship:
for candidate_url in url_utils.clean(url_record[2]):
if not url_utils.validate(candidate_url):
continue
if resolve and not url_utils.resolve(candidate_url):
continue
url_entityid[candidate_url] = urlid_entityid_relationship[urlid]
del urlid_entityid_relationship[urlid]
entityid_url = defaultdict(list)
# Inverts dictionary
for url, entityid in url_entityid.items():
entityid_url[entityid].append(url)
return entityid_url
def _artist_link_generator(self, dump_path: str, resolve: bool):
l_artist_url_path = os.path.join(dump_path, 'mbdump', 'l_artist_url')
# Loads all the relationships between URL and ARTIST ID
artistid_url = self._get_urls_for_entity_id(
dump_path, l_artist_url_path, resolve
)
LOGGER.info('Adding link entities to DB')
# Translates ARTIST ID to the relative ARTIST
artist_path = os.path.join(dump_path, 'mbdump', 'artist')
with open(artist_path, 'r') as artistfile:
n_rows = count_num_lines_in_file(artistfile)
artist_link_reader = DictReader(
artistfile,
delimiter='\t',
fieldnames=[
'id',
'gid',
'label',
'sort_label',
'b_year',
'b_month',
'b_day',
'd_year',
'd_month',
'd_day',
'type_id',
],
)
for artist in tqdm(artist_link_reader, total=n_rows):
if artist['id'] in artistid_url:
for link in artistid_url[artist['id']]:
if self._check_person(artist['type_id']):
current_entity = MusicBrainzArtistLinkEntity()
self._fill_link_entity(current_entity, artist['gid'], link)
yield current_entity
if self._check_band(artist['type_id']):
current_entity = MusicBrainzBandLinkEntity()
self._fill_link_entity(current_entity, artist['gid'], link)
yield current_entity
def _release_group_link_generator(self, dump_path: str, resolve: bool):
l_release_group_url_path = os.path.join(
dump_path, 'mbdump', 'l_release_group_url'
)
release_group_id_urls = self._get_urls_for_entity_id(
dump_path, l_release_group_url_path, resolve
)
release_group_path = os.path.join(dump_path, 'mbdump', 'release_group')
with open(release_group_path) as rfile:
n_rows = count_num_lines_in_file(rfile)
releases = DictReader(
rfile, delimiter='\t', fieldnames=['id', 'gid', 'label']
)
for release in tqdm(releases, total=n_rows):
if release['id'] in release_group_id_urls:
for link in release_group_id_urls[release['id']]:
entity = MusicBrainzReleaseGroupLinkEntity()
self._fill_link_entity(entity, release['gid'], link)
yield entity
def _isni_link_generator(self, dump_path: str, resolve: bool):
isni_file_path = os.path.join(dump_path, 'mbdump', 'artist_isni')
artist_link = {}
done = False
for result in external_id_pids_and_urls():
if done:
break
for pid, formatter in result.items():
if pid != 'P213':
continue
for url_formatter, _ in formatter.items():
with open(isni_file_path, 'r') as artistfile:
for artistid_isni in DictReader(
artistfile,
delimiter='\t',
fieldnames=['id', 'isni'],
):
# If ISNI is valid, generates an url
artistid = artistid_isni['id']
isni = artistid_isni['isni']
link = url_formatter.replace('$1', isni)
for candidate_url in url_utils.clean(link):
if not url_utils.validate(candidate_url):
continue
if resolve and not url_utils.resolve(candidate_url):
continue
artist_link[artistid] = candidate_url
done = True
artist_path = os.path.join(dump_path, 'mbdump', 'artist')
with open(artist_path, 'r') as artistfile:
n_rows = count_num_lines_in_file(artistfile)
artist_isni_reader = DictReader(
artistfile,
delimiter='\t',
fieldnames=[
'id',
'gid',
'label',
'sort_label',
'b_year',
'b_month',
'b_day',
'd_year',
'd_month',
'd_day',
'type_id',
],
)
for artist in tqdm(artist_isni_reader, total=n_rows):
try:
# Checks if artist has isni
link = artist_link[artist['id']]
if self._check_person(artist['type_id']):
current_entity = MusicBrainzArtistLinkEntity()
self._fill_link_entity(current_entity, artist['gid'], link)
yield current_entity
if self._check_band(artist['type_id']):
current_entity = MusicBrainzBandLinkEntity()
self._fill_link_entity(current_entity, artist['gid'], link)
yield current_entity
except KeyError:
continue
def _artist_generator(self, dump_path):
artist_alias_path = os.path.join(dump_path, 'mbdump', 'artist_alias')
artist_path = os.path.join(dump_path, 'mbdump', 'artist')
area_path = os.path.join(dump_path, 'mbdump', 'area')
aliases = defaultdict(list)
areas = {}
LOGGER.info('Getting artist aliases')
# Key is the entity id which has a list of aliases
with open(artist_alias_path, 'r') as aliasesfile:
for alias in DictReader(
aliasesfile,
delimiter='\t',
fieldnames=['id', 'parent_id', 'label'],
):
aliases[alias['parent_id']].append(alias['label'])
LOGGER.info('Getting area IDs and related names')
# Key is the area internal id, value is the name
with open(area_path, 'r') as areafile:
for area in DictReader(
areafile, delimiter='\t', fieldnames=['id', 'gid', 'name']
):
areas[area['id']] = area['name'].lower()
LOGGER.info('Importing artist entities into DB')
with open(artist_path, 'r') as artistfile:
n_rows = count_num_lines_in_file(artistfile)
artist_reader = DictReader(
artistfile,
delimiter='\t',
fieldnames=[
'id',
'gid',
'label',
'sort_label',
'b_year',
'b_month',
'b_day',
'd_year',
'd_month',
'd_day',
'type_id',
'area',
'gender',
'ND1',
'ND2',
'ND3',
'ND4',
'b_place',
'd_place',
],
)
for artist in tqdm(artist_reader, total=n_rows):
if self._check_person(artist['type_id']):
current_entity = MusicBrainzArtistEntity()
try:
self._fill_entity(current_entity, artist, areas)
current_entity.gender = self._artist_gender(artist['gender'])
except KeyError:
LOGGER.error('Wrong gender code: %s', artist)
continue
# Creates an entity foreach available alias
for alias in self._alias_entities(
current_entity,
MusicBrainzArtistEntity,
aliases[artist['id']],
):
alias.gender = current_entity.gender
yield alias
yield current_entity
if self._check_band(artist['type_id']):
current_entity = MusicBrainzBandEntity()
try:
self._fill_entity(current_entity, artist, areas)
except ValueError:
LOGGER.error('Wrong date: %s', artist)
continue
# Creates an entity foreach available alias
for alias in self._alias_entities(
current_entity,
MusicBrainzBandEntity,
aliases[artist['id']],
):
yield alias
yield current_entity
@staticmethod
def _artist_band_relationship_generator(dump_path):
link_types = set(['855', '103', '305', '965', '895'])
link_file_path = os.path.join(dump_path, 'mbdump', 'link')
to_invert = set()
LOGGER.info('Loading artist-band relationships')
links = set()
with open(link_file_path) as link_file:
reader = DictReader(
link_file, delimiter='\t', fieldnames=['id', 'link_type']
)
for row in reader:
if row['link_type'] in link_types:
links.add(row['id'])
artists_relationship_file = os.path.join(dump_path, 'mbdump', 'l_artist_artist')
ids_translator = {}
relationships = []
with open(artists_relationship_file) as relfile:
reader = DictReader(
relfile,
delimiter='\t',
fieldnames=['id', 'link_id', 'entity0', 'entity1'],
)
for row in reader:
link_id = row['link_id']
if link_id in links:
en0 = row['entity0']
en1 = row['entity1']
ids_translator[en0] = ''
ids_translator[en1] = ''
relationship = (en0, en1)
relationships.append(relationship)
if link_id == '855':
to_invert.add(relationship)
# To hope in Garbage collection intervention
links = None
artist_path = os.path.join(dump_path, 'mbdump', 'artist')
with open(artist_path, 'r') as artistfile:
for artist in DictReader(
artistfile, delimiter='\t', fieldnames=['id', 'gid']
):
if artist['id'] in ids_translator:
ids_translator[artist['id']] = artist['gid']
LOGGER.info('Adding relationships into DB')
for relation in tqdm(relationships):
translation0, translation1 = (
ids_translator[relation[0]],
ids_translator[relation[1]],
)
if translation0 and translation1:
if relation in to_invert:
yield (translation1, translation0)
else:
yield (translation0, translation1)
else:
LOGGER.warning(
"Artist id missing translation: %s to (%s, %s)",
relation,
translation0,
translation1,
)
def _release_group_generator(self, dump_path):
release_group_datesprec = self._retrieve_release_group_dates(dump_path)
release_group_path = os.path.join(dump_path, 'mbdump', 'release_group')
with open(release_group_path, 'r') as releasefile:
release_reader = DictReader(
releasefile,
delimiter='\t',
fieldnames=['id', 'gid', 'label', 'artist_credit', 'type_id'],
)
for row in tqdm(release_reader, total=count_num_lines_in_file(releasefile)):
entity = MusicBrainzReleaseGroupEntity()
self._fill_entity(entity, row, None)
if row['id'] in release_group_datesprec:
dateprec = release_group_datesprec[row['id']]
if dateprec[1] != 0:
entity.born_precision = dateprec[1]
entity.born = dateprec[0]
yield entity
@staticmethod
def _release_group_artist_relationship_generator(dump_path):
release_group_path = os.path.join(dump_path, 'mbdump', 'release_group')
artist_credit_release = defaultdict(list)
with open(release_group_path, 'r') as releasefile:
n_rows = count_num_lines_in_file(releasefile)
release_reader = DictReader(
releasefile,
delimiter='\t',
fieldnames=['id', 'gid', 'label', 'artist_credit', 'type_id'],
)
for row in tqdm(release_reader, total=n_rows):
artist_credit_release[row['artist_credit']].append(row['gid'])
artist_credit_name_path = os.path.join(
dump_path, 'mbdump', 'artist_credit_name'
)
artist_id_release = defaultdict(list)
with open(artist_credit_name_path) as artistcreditfile:
artist_credit_reader = DictReader(
artistcreditfile,
delimiter='\t',
fieldnames=['id', 'nd', 'artist_id', 'artist_name'],
)
n_rows = count_num_lines_in_file(artistcreditfile)
for row in tqdm(artist_credit_reader, total=n_rows):
artist_id_release[row['artist_id']] = artist_credit_release[row['id']]
# memory free up for performance
del artist_credit_release[row['id']]
artist_path = os.path.join(dump_path, 'mbdump', 'artist')
with open(artist_path, 'r') as artistfile:
n_rows = count_num_lines_in_file(artistfile)
artist_link_reader = DictReader(
artistfile,
delimiter='\t',
fieldnames=[
'id',
'gid',
'label',
'sort_label',
'b_year',
'b_month',
'b_day',
'd_year',
'd_month',
'd_day',
'type_id',
],
)
for artist in tqdm(artist_link_reader, total=n_rows):
for release_id in artist_id_release[artist['id']]:
yield (release_id, artist['gid'])
# memory freeup for performance
del artist_id_release[artist['id']]
def _fill_entity(self, entity, info, areas):
entity.catalog_id = info['gid']
entity.name = info['label']
name_tokens = text_utils.tokenize(info['label'])
if name_tokens:
entity.name_tokens = ' '.join(name_tokens)
try:
birth_date = self._get_date_and_precision(
info['b_year'], info['b_month'], info['b_day']
)
entity.born = birth_date[0]
entity.born_precision = birth_date[1]
except KeyError:
entity.born = None
entity.born_precision = None
try:
death_date = self._get_date_and_precision(
info['d_year'], info['d_month'], info['d_day']
)
entity.died = death_date[0]
entity.died_precision = death_date[1]
except KeyError:
entity.died = None
entity.died_precision = None
if isinstance(entity, (MusicBrainzArtistEntity, MusicBrainzBandEntity)):
try:
entity.birth_place = areas[info['b_place']]
except KeyError:
entity.birth_place = None
try:
entity.death_place = areas[info['d_place']]
except KeyError:
entity.death_place = None
@staticmethod
def _fill_link_entity(entity, gid, link):
entity.catalog_id = gid
entity.url = link
entity.is_wiki = url_utils.is_wiki_link(link)
url_tokens = url_utils.tokenize(link)
if url_tokens:
entity.url_tokens = ' '.join(url_tokens)
@staticmethod
def _alias_entities(entity: BaseEntity, aliases_class, aliases: []):
for alias_label in aliases:
alias_entity = aliases_class()
alias_entity.catalog_id = entity.catalog_id
alias_entity.born = entity.born
alias_entity.born_precision = entity.born_precision
alias_entity.died = entity.died
alias_entity.died_precision = entity.died_precision
alias_entity.birth_place = entity.birth_place
alias_entity.death_place = entity.death_place
alias_entity.name = alias_label
name_tokens = text_utils.tokenize(alias_label)
if name_tokens:
alias_entity.name_tokens = ' '.join(name_tokens)
yield alias_entity
@staticmethod
def _get_date_and_precision(year, month, day):
date_list = [year, month, day]
precision = -1
try:
if date_list[0] != '\\N' and int(date_list[0]) < 0:
LOGGER.warning(
'Failed to convert date (%s/%s/%s).'
'Encountered negative year, '
'which Python Date object does not support',
*date_list,
)
# We can't parse the date,
# so we treat is as if it wasn't available
date_list[0] = '\\N'
null_index = date_list.index('\\N')
precision = 8 + null_index if null_index > 0 else -1
except ValueError:
precision = 11
date_list = ['0001' if i == '\\N' else i for i in date_list]
if precision == -1:
return None, None
return (
date(int(date_list[0]), int(date_list[1]), int(date_list[2])),
precision,
)
@staticmethod
def _check_person(type_code):
# person, character
return type_code in ['1', '4']
@staticmethod
def _check_band(type_code):
# group, orchestra, choir
return type_code in ['2', '5', '6']
@staticmethod
def _artist_gender(gender_code):
genders = {'1': 'male', '2': 'female'}
return genders.get(gender_code, None)
def _retrieve_release_group_dates(self, dump_path):
release_dateprec = defaultdict(lambda: (date.today(), 0))
release_country_path = os.path.join(dump_path, 'mbdump', 'release_country')
with open(release_country_path) as rfile:
releases = DictReader(
rfile,
delimiter='\t',
fieldnames=['release_id', 'country_id', 'year', 'month', 'day'],
)
for release in releases:
date_prec = self._get_date_and_precision(
release['year'], release['month'], release['day']
)
if date_prec[0] is None:
continue
if date_prec[0] < release_dateprec[release['release_id']][0]:
release_dateprec[release['release_id']] = date_prec
release_country_path = os.path.join(
dump_path, 'mbdump', 'release_unknown_country'
)
with open(release_country_path) as rfile:
releases = DictReader(
rfile,
delimiter='\t',
fieldnames=['release_id', 'year', 'month', 'day'],
)
for release in releases:
date_prec = self._get_date_and_precision(
release['year'], release['month'], release['day']
)
if date_prec[0] is None:
continue
if date_prec[0] < release_dateprec[release['release_id']][0]:
release_dateprec[release['release_id']] = date_prec
release_group_dateprec = defaultdict(lambda: (date.today(), 0))
release_path = os.path.join(dump_path, 'mbdump', 'release')
with open(release_path) as rfile:
releases = DictReader(
rfile,
delimiter='\t',
fieldnames=[
'release_id',
'gid',
'name',
'credits',
'release_group_id',
],
)
for release in releases:
if (
release_dateprec[release['release_id']][0]
< release_group_dateprec[release['release_group_id']][0]
):
release_group_dateprec[
release['release_group_id']
] = release_dateprec[release['release_id']]
return release_group_dateprec