From 83d5c49f5093b1820b625e1b3a1e21fc7242f79e Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 22 Jan 2019 19:19:31 -0800 Subject: refactored crossref importer to new style --- python/fatcat_tools/importers/__init__.py | 6 +- python/fatcat_tools/importers/common.py | 124 +++++++++++++++++++++++++---- python/fatcat_tools/importers/crossref.py | 125 +++++++++++++----------------- python/tests/import_crossref.py | 27 ++++--- python/tests/importer.py | 34 ++++---- 5 files changed, 198 insertions(+), 118 deletions(-) diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index f2caca5c..7b20868c 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -12,11 +12,11 @@ To run an import you combine two classes; one each of: """ -from .common import FatcatImporter, make_kafka_consumer +from .common import FatcatImporter, JsonLinePusher, make_kafka_consumer from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP from .grobid_metadata import GrobidMetadataImporter from .journal_metadata import JournalMetadataImporter from .matched import MatchedImporter from .orcid import OrcidImporter -from .kafka_source import KafkaSource -from .file_source import FileSource +#from .kafka_source import KafkaSource +#from .file_source import FileSource diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 25ee4727..604aa78b 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -31,10 +31,13 @@ class EntityImporter: This class exposes helpers for implementations: self.api - self.create_related_*(entity) for all entity types + self.create_(entity) -> EntityEdit + for related entity types self.push_entity(entity) - self.counts['exits'] += 1 (if didn't update or insert because of existing) - self.counts['update'] += 1 (if updated an entity) + self.counts['exits'] += 1 + if didn't update or insert because of existing) + self.counts['update'] += 1 + if updated an entity """ def __init__(self, api, **kwargs): @@ -53,14 +56,20 @@ class EntityImporter: self._editgroup_id = None self._entity_queue = [] + self._issnl_id_map = dict() + self._orcid_id_map = dict() + self._orcid_regex = re.compile("^\\d{4}-\\d{4}-\\d{4}-\\d{3}[\\dX]$") + self._doi_id_map = dict() + def push_record(self, raw_record): """ Returns nothing. """ - if (not raw_record) or (not self.want(raw_record): + if (not raw_record) or (not self.want(raw_record)): self.counts['skip'] += 1 return entity = self.parse_record(raw_record) + assert entity if self.bezerk_mode: self.push_entity(entity) return @@ -68,7 +77,7 @@ class EntityImporter: self.push_entity(entity) return - def finish(self, raw_record): + def finish(self): if self._edit_count > 0: self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None @@ -79,8 +88,9 @@ class EntityImporter: self.counts['insert'] += len(_entity_queue) self._entity_queue = 0 - self.counts['total'] = counts['skip'] + counts['insert'] + \ - counts['update'] + counts['exists'] + self.counts['total'] = 0 + for key in ('skip', 'insert', 'update', 'exists'): + self.counts['total'] += self.counts[key] return self.counts def _get_editgroup(self, edits=1): @@ -100,8 +110,8 @@ class EntityImporter: def create_container(self, entity): eg = self._get_editgroup() - self.api.create_container(entity, editgroup_id=eg.editgroup_id) self.counts['sub.container'] += 1 + return self.api.create_container(entity, editgroup_id=eg.editgroup_id) def updated(self): """ @@ -147,6 +157,79 @@ class EntityImporter: def insert_batch(self, raw_record): raise NotImplementedError + def is_orcid(self, orcid): + return self._orcid_regex.match(orcid) is not None + + def lookup_orcid(self, orcid): + """Caches calls to the Orcid lookup API endpoint in a local dict""" + if not self.is_orcid(orcid): + return None + if orcid in self._orcid_id_map: + return self._orcid_id_map[orcid] + creator_id = None + try: + rv = self.api.lookup_creator(orcid=orcid) + creator_id = rv.ident + except ApiException as ae: + # If anything other than a 404 (not found), something is wrong + assert ae.status == 404 + self._orcid_id_map[orcid] = creator_id # might be None + return creator_id + + def is_doi(self, doi): + return doi.startswith("10.") and doi.count("/") >= 1 + + def lookup_doi(self, doi): + """Caches calls to the doi lookup API endpoint in a local dict""" + assert self.is_doi(doi) + doi = doi.lower() + if doi in self._doi_id_map: + return self._doi_id_map[doi] + release_id = None + try: + rv = self.api.lookup_release(doi=doi) + release_id = rv.ident + except ApiException as ae: + # If anything other than a 404 (not found), something is wrong + assert ae.status == 404 + self._doi_id_map[doi] = release_id # might be None + return release_id + + def is_issnl(self, issnl): + return len(issnl) == 9 and issnl[4] == '-' + + def lookup_issnl(self, issnl): + """Caches calls to the ISSN-L lookup API endpoint in a local dict""" + if issnl in self._issnl_id_map: + return self._issnl_id_map[issnl] + container_id = None + try: + rv = self.api.lookup_container(issnl=issnl) + container_id = rv.ident + except ApiException as ae: + # If anything other than a 404 (not found), something is wrong + assert ae.status == 404 + self._issnl_id_map[issnl] = container_id # might be None + return container_id + + def read_issn_map_file(self, issn_map_file): + print("Loading ISSN map file...") + self._issn_issnl_map = dict() + for line in issn_map_file: + if line.startswith("ISSN") or len(line) == 0: + continue + (issn, issnl) = line.split()[0:2] + self._issn_issnl_map[issn] = issnl + # double mapping makes lookups easy + self._issn_issnl_map[issnl] = issnl + print("Got {} ISSN-L mappings.".format(len(self._issn_issnl_map))) + + def issn2issnl(self, issn): + if issn is None: + return None + return self._issn_issnl_map.get(issn) + + class RecordPusher: """ @@ -155,15 +238,7 @@ class RecordPusher: """ def __init__(self, importer, **kwargs): - - eg_extra = kwargs.get('editgroup_extra', dict()) - eg_extra['git_rev'] = eg_extra.get('git_rev', - subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') - eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityImporter') - - self.api = api - self.bezerk_mode = kwargs.get('bezerk_mode', False) - self._editgroup_description = kwargs.get('editgroup_description') + self.importer = importer def run(self): """ @@ -177,6 +252,21 @@ class RecordPusher: raise NotImplementedError +class JsonLinePusher: + + def __init__(self, importer, in_file, **kwargs): + self.importer = importer + self.in_file = in_file + + def run(self): + for line in self.in_file: + if not line: + continue + record = json.loads(line) + self.importer.push_record(record) + print(self.importer.finish()) + + # from: https://docs.python.org/3/library/itertools.html def grouper(iterable, n, fillvalue=None): "Collect data into fixed-length chunks or blocks" diff --git a/python/fatcat_tools/importers/crossref.py b/python/fatcat_tools/importers/crossref.py index 8953dd82..6866cb6f 100644 --- a/python/fatcat_tools/importers/crossref.py +++ b/python/fatcat_tools/importers/crossref.py @@ -6,7 +6,7 @@ import datetime import itertools import subprocess import fatcat_client -from .common import FatcatImporter +from .common import EntityImporter # The docs/guide should be the cannonical home for these mappings; update there @@ -57,7 +57,7 @@ LICENSE_SLUG_MAP = { # http://www.springer.com/tdm doesn't seem like a license } -class CrossrefImporter(FatcatImporter): +class CrossrefImporter(EntityImporter): """ Importer for Crossref metadata. @@ -76,9 +76,9 @@ class CrossrefImporter(FatcatImporter): issn_map_file=issn_map_file, editgroup_description=eg_desc, editgroup_extra=eg_extra) + + self.create_containers = kwargs.get('create_containers') extid_map_file = kwargs.get('extid_map_file') - create_containers = kwargs.get('create_containers') - check_existing = kwargs.get('check_existing') self.extid_map_db = None if extid_map_file: db_uri = "file:{}?mode=ro".format(extid_map_file) @@ -86,8 +86,8 @@ class CrossrefImporter(FatcatImporter): self.extid_map_db = sqlite3.connect(db_uri, uri=True) else: print("Not using external ID map") - self.create_containers = create_containers - self.check_existing = check_existing + + self.read_issn_map_file(issn_map_file) def lookup_ext_ids(self, doi): if self.extid_map_db is None: @@ -110,38 +110,38 @@ class CrossrefImporter(FatcatImporter): def map_release_type(self, crossref_type): return CROSSREF_TYPE_MAP.get(crossref_type) - def parse_crossref_dict(self, obj): - """ - obj is a python dict (parsed from json). - returns a ReleaseEntity - """ + def map_container_type(self, crossref_type): + return CONTAINER_TYPE_MAP.get(release_type) - # Do require the 'title' keys to exsit, as release entities do - if (not 'title' in obj) or (not obj['title']): - return None + def want(self, obj): # Ways to be out of scope (provisionally) # journal-issue and journal-volume map to None, but allowed for now if obj.get('type') in (None, 'journal', 'proceedings', 'standard-series', 'report-series', 'book-series', 'book-set', 'book-track', 'proceedings-series'): - return None + return False - release_type = self.map_release_type(obj['type']) + # Do require the 'title' keys to exsit, as release entities do + if (not 'title' in obj) or (not obj['title']): + return False - # lookup existing DOI - existing_release = None - if self.check_existing: - try: - existing_release = self.api.lookup_release(doi=obj['DOI'].lower()) - except fatcat_client.rest.ApiException as err: - if err.status != 404: - raise err + # Can't handle such large lists yet + authors = len(obj.get('author', [])) + abstracts = len(obj.get('abstract', [])) + refs = len(obj.get('reference', [])) + if max(authors, abstracts, refs) > 750: + return False - # eventually we'll want to support "updates", but for now just skip if - # entity already exists - if existing_release: - return None + return True + + def parse_record(self, obj): + """ + obj is a python dict (parsed from json). + returns a ReleaseEntity + """ + + release_type = self.map_release_type(obj['type']) # contribs def do_contribs(obj_list, ctype): @@ -195,14 +195,15 @@ class CrossrefImporter(FatcatImporter): container_id = self.lookup_issnl(issnl) publisher = obj.get('publisher') - ce = None if (container_id is None and self.create_containers and (issnl is not None) and obj.get('container-title') and len(obj['container-title']) > 0): ce = fatcat_client.ContainerEntity( issnl=issnl, publisher=publisher, - container_type=CONTAINER_TYPE_MAP.get(release_type), + container_type=self.map_container_type(release_type), name=obj['container-title'][0]) + ce_edit = self.create_container(ce) + container_id = ce_edit.ident # license slug license_slug = None @@ -309,8 +310,7 @@ class CrossrefImporter(FatcatImporter): # TODO: filter out huge releases; we'll get them later (and fix bug in # fatcatd) - if max(len(contribs), len(refs), len(abstracts)) > 750: - return None + assert max(len(contribs), len(refs), len(abstracts)) <= 750 # release date parsing is amazingly complex raw_date = obj['issued']['date-parts'][0] @@ -354,41 +354,28 @@ class CrossrefImporter(FatcatImporter): contribs=contribs, refs=refs, ) - return (re, ce) + return re + + def try_update(self, re): + + # lookup existing DOI (don't need to try other ext idents for crossref) + existing_release = None + try: + existing_release = self.api.lookup_release(doi=re.doi) + except fatcat_client.rest.ApiException as err: + if err.status != 404: + raise err + # doesn't exist, need to update + return True + + # eventually we'll want to support "updates", but for now just skip if + # entity already exists + if existing_release: + self.counts['exists'] += 1 + return False + + return True + + def insert_batch(self, batch): + self.api.create_release_batch(batch, autoaccept=True) - def create_row(self, row, editgroup_id=None): - if row is None: - return - obj = json.loads(row) - entities = self.parse_crossref_dict(obj) - # XXX: - print(entities) - if entities is not None: - (re, ce) = entities - if ce is not None: - container = self.api.create_container(ce, editgroup_id=editgroup_id) - re.container_id = container.ident - self._issnl_id_map[ce.issnl] = container.ident - self.api.create_release(re, editgroup_id=editgroup_id) - self.counts['insert'] += 1 - - def create_batch(self, batch): - """Current work/release pairing disallows batch creation of releases. - Could do batch work creation and then match against releases, but meh.""" - release_batch = [] - for row in batch: - if row is None: - continue - obj = json.loads(row) - entities = self.parse_crossref_dict(obj) - if entities is not None: - (re, ce) = entities - if ce is not None: - ce_eg = self.api.create_editgroup(fatcat_client.Editgroup()) - container = self.api.create_container(ce, editgroup_id=ce_eg.editgroup_id) - self.api.accept_editgroup(ce_eg.editgroup_id) - re.container_id = container.ident - self._issnl_id_map[ce.issnl] = container.ident - release_batch.append(re) - self.api.create_release_batch(release_batch, autoaccept=True) - self.counts['insert'] += len(release_batch) diff --git a/python/tests/import_crossref.py b/python/tests/import_crossref.py index 89ce9fc9..5e0a150f 100644 --- a/python/tests/import_crossref.py +++ b/python/tests/import_crossref.py @@ -1,27 +1,29 @@ import json import pytest -from fatcat_tools.importers import CrossrefImporter +from fatcat_tools.importers import CrossrefImporter, JsonLinePusher from fixtures import api @pytest.fixture(scope="function") def crossref_importer(api): with open('tests/files/ISSN-to-ISSN-L.snip.txt', 'r') as issn_file: - yield CrossrefImporter(api, issn_file, extid_map_file='tests/files/example_map.sqlite3', check_existing=False) + yield CrossrefImporter(api, issn_file, extid_map_file='tests/files/example_map.sqlite3', bezerk_mode=True) @pytest.fixture(scope="function") def crossref_importer_existing(api): with open('tests/files/ISSN-to-ISSN-L.snip.txt', 'r') as issn_file: - yield CrossrefImporter(api, issn_file, extid_map_file='tests/files/example_map.sqlite3', check_existing=True) + yield CrossrefImporter(api, issn_file, extid_map_file='tests/files/example_map.sqlite3', bezerk_mode=False) def test_crossref_importer_batch(crossref_importer): with open('tests/files/crossref-works.2018-01-21.badsample.json', 'r') as f: - crossref_importer.process_batch(f) + pusher = JsonLinePusher(crossref_importer, f) + pusher.run() def test_crossref_importer(crossref_importer): with open('tests/files/crossref-works.2018-01-21.badsample.json', 'r') as f: - crossref_importer.process_source(f) + pusher = JsonLinePusher(crossref_importer, f) + pusher.run() # fetch most recent editgroup changes = crossref_importer.api.get_changelog(limit=1) eg = changes[0].editgroup @@ -39,13 +41,14 @@ def test_crossref_mappings(crossref_importer): def test_crossref_importer_create(crossref_importer): crossref_importer.create_containers = True with open('tests/files/crossref-works.2018-01-21.badsample.json', 'r') as f: - crossref_importer.process_source(f) + pusher = JsonLinePusher(crossref_importer, f) + pusher.run() def test_crossref_dict_parse(crossref_importer): with open('tests/files/crossref-works.single.json', 'r') as f: # not a single line raw = json.loads(f.read()) - (r, c) = crossref_importer.parse_crossref_dict(raw) + r = crossref_importer.parse_record(raw) extra = r.extra['crossref'] assert r.title == "Renormalized perturbation theory by the moment method for degenerate states: Anharmonic oscillators" assert r.doi == "10.1002/(sici)1097-461x(1998)66:4<261::aid-qua1>3.0.co;2-t" @@ -79,8 +82,10 @@ def test_crossref_dict_parse(crossref_importer): def test_stateful_checking(crossref_importer_existing): with open('tests/files/crossref-works.single.json', 'r') as f: # not a single line, a whole document - raw = json.loads(f.read()) + raw = f.read() # might not exist yet... - crossref_importer_existing.process_source([json.dumps(raw)]) - # ok, make sure we get 'None' back - assert crossref_importer_existing.parse_crossref_dict(raw) is None + crossref_importer_existing.push_record(json.loads(raw)) + crossref_importer_existing.finish() + # make sure we wouldn't insert again + entity = crossref_importer_existing.parse_record(json.loads(raw)) + assert crossref_importer_existing.try_update(entity) is False diff --git a/python/tests/importer.py b/python/tests/importer.py index 34efa5d8..9308ba84 100644 --- a/python/tests/importer.py +++ b/python/tests/importer.py @@ -1,13 +1,13 @@ import pytest -from fatcat_tools.importers import FatcatImporter +from fatcat_tools.importers import CrossrefImporter, OrcidImporter from fixtures import api def test_issnl_mapping_lookup(api): with open('tests/files/ISSN-to-ISSN-L.snip.txt', 'r') as issn_file: - fi = FatcatImporter(api, issn_map_file=issn_file) + fi = CrossrefImporter(api, issn_map_file=issn_file) assert fi.issn2issnl('0000-0027') == '0002-0027' assert fi.issn2issnl('0002-0027') == '0002-0027' @@ -18,20 +18,18 @@ def test_issnl_mapping_lookup(api): def test_identifiers(api): with open('tests/files/ISSN-to-ISSN-L.snip.txt', 'r') as issn_file: - fi = FatcatImporter(api, issn_map_file=issn_file) - - assert fi.is_issnl("1234-5678") == True - assert fi.is_issnl("1234-5678.") == False - assert fi.is_issnl("12345678") == False - assert fi.is_issnl("1-2345678") == False - - assert fi.is_doi("10.1234/56789") == True - assert fi.is_doi("101234/56789") == False - assert fi.is_doi("10.1234_56789") == False - - assert fi.is_orcid("0000-0003-3118-6591") == True - assert fi.is_orcid("0000-0003-3953-765X") == True - assert fi.is_orcid("0000-00x3-3118-659") == False - assert fi.is_orcid("0000-00033118-659") == False - assert fi.is_orcid("0000-0003-3118-659.") == False + ci = CrossrefImporter(api, issn_map_file=issn_file) + + assert ci.is_issnl("1234-5678") == True + assert ci.is_issnl("1234-5678.") == False + assert ci.is_issnl("12345678") == False + assert ci.is_issnl("1-2345678") == False + + oi = OrcidImporter(api) + + assert oi.is_orcid("0000-0003-3118-6591") == True + assert oi.is_orcid("0000-0003-3953-765X") == True + assert oi.is_orcid("0000-00x3-3118-659") == False + assert oi.is_orcid("0000-00033118-659") == False + assert oi.is_orcid("0000-0003-3118-659.") == False -- cgit v1.2.3