diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2019-01-22 19:19:31 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2019-01-22 19:19:31 -0800 |
commit | 83d5c49f5093b1820b625e1b3a1e21fc7242f79e (patch) | |
tree | 6f45ed910c0d4f589739da3584c72ca722d5b9a9 /python/fatcat_tools/importers | |
parent | a2086616c23320153eacec7e4f0d3c6e1c6d7790 (diff) | |
download | fatcat-83d5c49f5093b1820b625e1b3a1e21fc7242f79e.tar.gz fatcat-83d5c49f5093b1820b625e1b3a1e21fc7242f79e.zip |
refactored crossref importer to new style
Diffstat (limited to 'python/fatcat_tools/importers')
-rw-r--r-- | python/fatcat_tools/importers/__init__.py | 6 | ||||
-rw-r--r-- | python/fatcat_tools/importers/common.py | 124 | ||||
-rw-r--r-- | python/fatcat_tools/importers/crossref.py | 125 |
3 files changed, 166 insertions, 89 deletions
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index f2caca5c..7b20868c 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -12,11 +12,11 @@ To run an import you combine two classes; one each of: """ -from .common import FatcatImporter, make_kafka_consumer +from .common import FatcatImporter, JsonLinePusher, make_kafka_consumer from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP from .grobid_metadata import GrobidMetadataImporter from .journal_metadata import JournalMetadataImporter from .matched import MatchedImporter from .orcid import OrcidImporter -from .kafka_source import KafkaSource -from .file_source import FileSource +#from .kafka_source import KafkaSource +#from .file_source import FileSource diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 25ee4727..604aa78b 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -31,10 +31,13 @@ class EntityImporter: This class exposes helpers for implementations: self.api - self.create_related_*(entity) for all entity types + self.create_<entity>(entity) -> EntityEdit + for related entity types self.push_entity(entity) - self.counts['exits'] += 1 (if didn't update or insert because of existing) - self.counts['update'] += 1 (if updated an entity) + self.counts['exits'] += 1 + if didn't update or insert because of existing) + self.counts['update'] += 1 + if updated an entity """ def __init__(self, api, **kwargs): @@ -53,14 +56,20 @@ class EntityImporter: self._editgroup_id = None self._entity_queue = [] + self._issnl_id_map = dict() + self._orcid_id_map = dict() + self._orcid_regex = re.compile("^\\d{4}-\\d{4}-\\d{4}-\\d{3}[\\dX]$") + self._doi_id_map = dict() + def push_record(self, raw_record): """ Returns nothing. """ - if (not raw_record) or (not self.want(raw_record): + if (not raw_record) or (not self.want(raw_record)): self.counts['skip'] += 1 return entity = self.parse_record(raw_record) + assert entity if self.bezerk_mode: self.push_entity(entity) return @@ -68,7 +77,7 @@ class EntityImporter: self.push_entity(entity) return - def finish(self, raw_record): + def finish(self): if self._edit_count > 0: self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None @@ -79,8 +88,9 @@ class EntityImporter: self.counts['insert'] += len(_entity_queue) self._entity_queue = 0 - self.counts['total'] = counts['skip'] + counts['insert'] + \ - counts['update'] + counts['exists'] + self.counts['total'] = 0 + for key in ('skip', 'insert', 'update', 'exists'): + self.counts['total'] += self.counts[key] return self.counts def _get_editgroup(self, edits=1): @@ -100,8 +110,8 @@ class EntityImporter: def create_container(self, entity): eg = self._get_editgroup() - self.api.create_container(entity, editgroup_id=eg.editgroup_id) self.counts['sub.container'] += 1 + return self.api.create_container(entity, editgroup_id=eg.editgroup_id) def updated(self): """ @@ -147,6 +157,79 @@ class EntityImporter: def insert_batch(self, raw_record): raise NotImplementedError + def is_orcid(self, orcid): + return self._orcid_regex.match(orcid) is not None + + def lookup_orcid(self, orcid): + """Caches calls to the Orcid lookup API endpoint in a local dict""" + if not self.is_orcid(orcid): + return None + if orcid in self._orcid_id_map: + return self._orcid_id_map[orcid] + creator_id = None + try: + rv = self.api.lookup_creator(orcid=orcid) + creator_id = rv.ident + except ApiException as ae: + # If anything other than a 404 (not found), something is wrong + assert ae.status == 404 + self._orcid_id_map[orcid] = creator_id # might be None + return creator_id + + def is_doi(self, doi): + return doi.startswith("10.") and doi.count("/") >= 1 + + def lookup_doi(self, doi): + """Caches calls to the doi lookup API endpoint in a local dict""" + assert self.is_doi(doi) + doi = doi.lower() + if doi in self._doi_id_map: + return self._doi_id_map[doi] + release_id = None + try: + rv = self.api.lookup_release(doi=doi) + release_id = rv.ident + except ApiException as ae: + # If anything other than a 404 (not found), something is wrong + assert ae.status == 404 + self._doi_id_map[doi] = release_id # might be None + return release_id + + def is_issnl(self, issnl): + return len(issnl) == 9 and issnl[4] == '-' + + def lookup_issnl(self, issnl): + """Caches calls to the ISSN-L lookup API endpoint in a local dict""" + if issnl in self._issnl_id_map: + return self._issnl_id_map[issnl] + container_id = None + try: + rv = self.api.lookup_container(issnl=issnl) + container_id = rv.ident + except ApiException as ae: + # If anything other than a 404 (not found), something is wrong + assert ae.status == 404 + self._issnl_id_map[issnl] = container_id # might be None + return container_id + + def read_issn_map_file(self, issn_map_file): + print("Loading ISSN map file...") + self._issn_issnl_map = dict() + for line in issn_map_file: + if line.startswith("ISSN") or len(line) == 0: + continue + (issn, issnl) = line.split()[0:2] + self._issn_issnl_map[issn] = issnl + # double mapping makes lookups easy + self._issn_issnl_map[issnl] = issnl + print("Got {} ISSN-L mappings.".format(len(self._issn_issnl_map))) + + def issn2issnl(self, issn): + if issn is None: + return None + return self._issn_issnl_map.get(issn) + + class RecordPusher: """ @@ -155,15 +238,7 @@ class RecordPusher: """ def __init__(self, importer, **kwargs): - - eg_extra = kwargs.get('editgroup_extra', dict()) - eg_extra['git_rev'] = eg_extra.get('git_rev', - subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') - eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityImporter') - - self.api = api - self.bezerk_mode = kwargs.get('bezerk_mode', False) - self._editgroup_description = kwargs.get('editgroup_description') + self.importer = importer def run(self): """ @@ -177,6 +252,21 @@ class RecordPusher: raise NotImplementedError +class JsonLinePusher: + + def __init__(self, importer, in_file, **kwargs): + self.importer = importer + self.in_file = in_file + + def run(self): + for line in self.in_file: + if not line: + continue + record = json.loads(line) + self.importer.push_record(record) + print(self.importer.finish()) + + # from: https://docs.python.org/3/library/itertools.html def grouper(iterable, n, fillvalue=None): "Collect data into fixed-length chunks or blocks" diff --git a/python/fatcat_tools/importers/crossref.py b/python/fatcat_tools/importers/crossref.py index 8953dd82..6866cb6f 100644 --- a/python/fatcat_tools/importers/crossref.py +++ b/python/fatcat_tools/importers/crossref.py @@ -6,7 +6,7 @@ import datetime import itertools import subprocess import fatcat_client -from .common import FatcatImporter +from .common import EntityImporter # The docs/guide should be the cannonical home for these mappings; update there @@ -57,7 +57,7 @@ LICENSE_SLUG_MAP = { # http://www.springer.com/tdm doesn't seem like a license } -class CrossrefImporter(FatcatImporter): +class CrossrefImporter(EntityImporter): """ Importer for Crossref metadata. @@ -76,9 +76,9 @@ class CrossrefImporter(FatcatImporter): issn_map_file=issn_map_file, editgroup_description=eg_desc, editgroup_extra=eg_extra) + + self.create_containers = kwargs.get('create_containers') extid_map_file = kwargs.get('extid_map_file') - create_containers = kwargs.get('create_containers') - check_existing = kwargs.get('check_existing') self.extid_map_db = None if extid_map_file: db_uri = "file:{}?mode=ro".format(extid_map_file) @@ -86,8 +86,8 @@ class CrossrefImporter(FatcatImporter): self.extid_map_db = sqlite3.connect(db_uri, uri=True) else: print("Not using external ID map") - self.create_containers = create_containers - self.check_existing = check_existing + + self.read_issn_map_file(issn_map_file) def lookup_ext_ids(self, doi): if self.extid_map_db is None: @@ -110,38 +110,38 @@ class CrossrefImporter(FatcatImporter): def map_release_type(self, crossref_type): return CROSSREF_TYPE_MAP.get(crossref_type) - def parse_crossref_dict(self, obj): - """ - obj is a python dict (parsed from json). - returns a ReleaseEntity - """ + def map_container_type(self, crossref_type): + return CONTAINER_TYPE_MAP.get(release_type) - # Do require the 'title' keys to exsit, as release entities do - if (not 'title' in obj) or (not obj['title']): - return None + def want(self, obj): # Ways to be out of scope (provisionally) # journal-issue and journal-volume map to None, but allowed for now if obj.get('type') in (None, 'journal', 'proceedings', 'standard-series', 'report-series', 'book-series', 'book-set', 'book-track', 'proceedings-series'): - return None + return False - release_type = self.map_release_type(obj['type']) + # Do require the 'title' keys to exsit, as release entities do + if (not 'title' in obj) or (not obj['title']): + return False - # lookup existing DOI - existing_release = None - if self.check_existing: - try: - existing_release = self.api.lookup_release(doi=obj['DOI'].lower()) - except fatcat_client.rest.ApiException as err: - if err.status != 404: - raise err + # Can't handle such large lists yet + authors = len(obj.get('author', [])) + abstracts = len(obj.get('abstract', [])) + refs = len(obj.get('reference', [])) + if max(authors, abstracts, refs) > 750: + return False - # eventually we'll want to support "updates", but for now just skip if - # entity already exists - if existing_release: - return None + return True + + def parse_record(self, obj): + """ + obj is a python dict (parsed from json). + returns a ReleaseEntity + """ + + release_type = self.map_release_type(obj['type']) # contribs def do_contribs(obj_list, ctype): @@ -195,14 +195,15 @@ class CrossrefImporter(FatcatImporter): container_id = self.lookup_issnl(issnl) publisher = obj.get('publisher') - ce = None if (container_id is None and self.create_containers and (issnl is not None) and obj.get('container-title') and len(obj['container-title']) > 0): ce = fatcat_client.ContainerEntity( issnl=issnl, publisher=publisher, - container_type=CONTAINER_TYPE_MAP.get(release_type), + container_type=self.map_container_type(release_type), name=obj['container-title'][0]) + ce_edit = self.create_container(ce) + container_id = ce_edit.ident # license slug license_slug = None @@ -309,8 +310,7 @@ class CrossrefImporter(FatcatImporter): # TODO: filter out huge releases; we'll get them later (and fix bug in # fatcatd) - if max(len(contribs), len(refs), len(abstracts)) > 750: - return None + assert max(len(contribs), len(refs), len(abstracts)) <= 750 # release date parsing is amazingly complex raw_date = obj['issued']['date-parts'][0] @@ -354,41 +354,28 @@ class CrossrefImporter(FatcatImporter): contribs=contribs, refs=refs, ) - return (re, ce) + return re + + def try_update(self, re): + + # lookup existing DOI (don't need to try other ext idents for crossref) + existing_release = None + try: + existing_release = self.api.lookup_release(doi=re.doi) + except fatcat_client.rest.ApiException as err: + if err.status != 404: + raise err + # doesn't exist, need to update + return True + + # eventually we'll want to support "updates", but for now just skip if + # entity already exists + if existing_release: + self.counts['exists'] += 1 + return False + + return True + + def insert_batch(self, batch): + self.api.create_release_batch(batch, autoaccept=True) - def create_row(self, row, editgroup_id=None): - if row is None: - return - obj = json.loads(row) - entities = self.parse_crossref_dict(obj) - # XXX: - print(entities) - if entities is not None: - (re, ce) = entities - if ce is not None: - container = self.api.create_container(ce, editgroup_id=editgroup_id) - re.container_id = container.ident - self._issnl_id_map[ce.issnl] = container.ident - self.api.create_release(re, editgroup_id=editgroup_id) - self.counts['insert'] += 1 - - def create_batch(self, batch): - """Current work/release pairing disallows batch creation of releases. - Could do batch work creation and then match against releases, but meh.""" - release_batch = [] - for row in batch: - if row is None: - continue - obj = json.loads(row) - entities = self.parse_crossref_dict(obj) - if entities is not None: - (re, ce) = entities - if ce is not None: - ce_eg = self.api.create_editgroup(fatcat_client.Editgroup()) - container = self.api.create_container(ce, editgroup_id=ce_eg.editgroup_id) - self.api.accept_editgroup(ce_eg.editgroup_id) - re.container_id = container.ident - self._issnl_id_map[ce.issnl] = container.ident - release_batch.append(re) - self.api.create_release_batch(release_batch, autoaccept=True) - self.counts['insert'] += len(release_batch) |