From e0f70bbbcbcb6232cfb508ad5c0ae637391c4871 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 22 Jan 2019 22:04:39 -0800 Subject: refactor remaining importers --- python/fatcat_tools/importers/__init__.py | 2 +- python/fatcat_tools/importers/common.py | 264 ++++++++-------------- python/fatcat_tools/importers/crossref.py | 11 +- python/fatcat_tools/importers/grobid_metadata.py | 104 ++++++--- python/fatcat_tools/importers/journal_metadata.py | 49 ++-- python/fatcat_tools/importers/matched.py | 150 ++++++------ python/fatcat_tools/importers/orcid.py | 45 ++-- python/tests/import_crossref.py | 7 +- python/tests/import_grobid_metadata.py | 10 +- python/tests/import_journal_metadata.py | 8 +- python/tests/import_matched.py | 9 +- python/tests/import_orcid.py | 19 +- python/tests/transform_tests.py | 2 +- 13 files changed, 324 insertions(+), 356 deletions(-) (limited to 'python') diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 7b20868c..b709f714 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -12,7 +12,7 @@ To run an import you combine two classes; one each of: """ -from .common import FatcatImporter, JsonLinePusher, make_kafka_consumer +from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, KafkaJsonPusher, make_kafka_consumer from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP from .grobid_metadata import GrobidMetadataImporter from .journal_metadata import JournalMetadataImporter diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 604aa78b..e7fe2305 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -49,9 +49,11 @@ class EntityImporter: self.api = api self.bezerk_mode = kwargs.get('bezerk_mode', False) - self._editgroup_description = kwargs.get('editgroup_description') - self._editgroup_extra = kwargs.get('editgroup_extra') - self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0, 'sub.container': 0}) + self.serial_mode = kwargs.get('serial_mode', False) + self.edit_batch_size = kwargs.get('edit_batch_size', 100) + self.editgroup_description = kwargs.get('editgroup_description') + self.editgroup_extra = kwargs.get('editgroup_extra') + self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0}) self._edit_count = 0 self._editgroup_id = None self._entity_queue = [] @@ -69,7 +71,9 @@ class EntityImporter: self.counts['skip'] += 1 return entity = self.parse_record(raw_record) - assert entity + if not entity: + self.counts['skip'] += 1 + return if self.bezerk_mode: self.push_entity(entity) return @@ -85,8 +89,8 @@ class EntityImporter: if self._entity_queue: self.insert_batch(self._entity_queue) - self.counts['insert'] += len(_entity_queue) - self._entity_queue = 0 + self.counts['insert'] += len(self._entity_queue) + self._entity_queue = [] self.counts['total'] = 0 for key in ('skip', 'insert', 'update', 'exists'): @@ -101,17 +105,28 @@ class EntityImporter: if not self._editgroup_id: eg = self.api.create_editgroup( - description=self._editgroup_description, - extra=self._editgroup_extra) + fatcat_client.Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra)) self._editgroup_id = eg.editgroup_id self._edit_count += edits return self._editgroup_id def create_container(self, entity): - eg = self._get_editgroup() - self.counts['sub.container'] += 1 - return self.api.create_container(entity, editgroup_id=eg.editgroup_id) + eg_id = self._get_editgroup() + self.counts['inserted.container'] += 1 + return self.api.create_container(entity, editgroup_id=eg_id) + + def create_release(self, entity): + eg_id = self._get_editgroup() + self.counts['inserted.release'] += 1 + return self.api.create_release(entity, editgroup_id=eg_id) + + def create_file(self, entity): + eg_id = self._get_editgroup() + self.counts['inserted.file'] += 1 + return self.api.create_file(entity, editgroup_id=eg_id) def updated(self): """ @@ -135,10 +150,10 @@ class EntityImporter: def parse(self, raw_record): """ - Returns an entity class type. expected to always succeed, or raise an - exception (`want()` should be used to filter out bad records). May have - side-effects (eg, create related entities), but shouldn't update/mutate - the actual entity. + Returns an entity class type, or None if we should skip this one. + + May have side-effects (eg, create related entities), but shouldn't + update/mutate the actual entity. """ raise NotImplementedError @@ -148,9 +163,10 @@ class EntityImporter: update it (PUT), decide we should do nothing (based on the existing record), or create a new one. - Implementations should update exists/updated counts appropriately. + Implementations must update the exists/updated/skip counts + appropriately in this method. - Returns boolean: True if + Returns boolean: True if the entity should still be inserted, False otherwise """ raise NotImplementedError @@ -230,7 +246,6 @@ class EntityImporter: return self._issn_issnl_map.get(issn) - class RecordPusher: """ Base class for different importer sources. Pretty trivial interface, just @@ -252,14 +267,14 @@ class RecordPusher: raise NotImplementedError -class JsonLinePusher: +class JsonLinePusher(RecordPusher): - def __init__(self, importer, in_file, **kwargs): + def __init__(self, importer, json_file, **kwargs): self.importer = importer - self.in_file = in_file + self.json_file = json_file def run(self): - for line in self.in_file: + for line in self.json_file: if not line: continue record = json.loads(line) @@ -267,11 +282,59 @@ class JsonLinePusher: print(self.importer.finish()) -# from: https://docs.python.org/3/library/itertools.html -def grouper(iterable, n, fillvalue=None): - "Collect data into fixed-length chunks or blocks" - args = [iter(iterable)] * n - return itertools.zip_longest(*args, fillvalue=fillvalue) +class CsvPusher(RecordPusher): + + def __init__(self, importer, csv_file, **kwargs): + self.importer = importer + self.reader = csv.DictReader(csv_file, delimiter=kwargs.get('delimiter', ',')) + + def run(self): + for line in self.reader: + if not line: + continue + self.importer.push_record(line) + print(self.importer.finish()) + + +class LinePusher(RecordPusher): + + def __init__(self, importer, text_file, **kwargs): + self.importer = importer + self.text_file = text_file + + def run(self): + for line in self.text_file: + if not line: + continue + self.importer.push_record(line) + print(self.importer.finish()) + + +class KafkaJsonPusher(RecordPusher): + + def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs): + self.importer = importer + self.consumer = make_kafka_consumer( + kafka_hosts, + kafka_env, + topic_suffix, + group, + ) + + def run(self): + count = 0 + for msg in self.consumer: + if not msg: + continue + record = json.loads(msg.value.decode('utf-8')) + self.importer.push_record(record) + count += 1 + if count % 500 == 0: + print("Import counts: {}".format(self.importer.counts)) + # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or + # commit the current batch if it has been lingering + print(self.importer.finish()) + def make_kafka_consumer(hosts, env, topic_suffix, group): topic_name = "fatcat-{}.{}".format(env, topic_suffix).encode('utf-8') @@ -288,150 +351,3 @@ def make_kafka_consumer(hosts, env, topic_suffix, group): ) return consumer -class FatcatImporter: - """ - Base class for fatcat importers - """ - - def __init__(self, api, **kwargs): - - eg_extra = kwargs.get('editgroup_extra', dict()) - eg_extra['git_rev'] = eg_extra.get('git_rev', - subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') - eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.FatcatImporter') - - self.api = api - self._editgroup_description = kwargs.get('editgroup_description') - self._editgroup_extra = kwargs.get('editgroup_extra') - issn_map_file = kwargs.get('issn_map_file') - - self._issnl_id_map = dict() - self._orcid_id_map = dict() - self._doi_id_map = dict() - if issn_map_file: - self.read_issn_map_file(issn_map_file) - self._orcid_regex = re.compile("^\\d{4}-\\d{4}-\\d{4}-\\d{3}[\\dX]$") - self.counts = Counter({'insert': 0, 'update': 0, 'processed_lines': 0}) - - def _editgroup(self): - eg = fatcat_client.Editgroup( - description=self._editgroup_description, - extra=self._editgroup_extra, - ) - return self.api.create_editgroup(eg) - - def describe_run(self): - print("Processed {} lines, inserted {}, updated {}.".format( - self.counts['processed_lines'], self.counts['insert'], self.counts['update'])) - - def create_row(self, row, editgroup_id=None): - # sub-classes expected to implement this - raise NotImplementedError - - def create_batch(self, rows, editgroup_id=None): - # sub-classes expected to implement this - raise NotImplementedError - - def process_source(self, source, group_size=100): - """Creates and auto-accepts editgroup every group_size rows""" - eg = self._editgroup() - i = 0 - for i, row in enumerate(source): - self.create_row(row, editgroup_id=eg.editgroup_id) - if i > 0 and (i % group_size) == 0: - self.api.accept_editgroup(eg.editgroup_id) - eg = self._editgroup() - self.counts['processed_lines'] += 1 - if i == 0 or (i % group_size) != 0: - self.api.accept_editgroup(eg.editgroup_id) - - def process_batch(self, source, size=50, decode_kafka=False): - """Reads and processes in batches (not API-call-per-)""" - for rows in grouper(source, size): - if decode_kafka: - rows = [msg.value.decode('utf-8') for msg in rows] - self.counts['processed_lines'] += len(rows) - #eg = self._editgroup() - #self.create_batch(rows, editgroup_id=eg.editgroup_id) - self.create_batch(rows) - - def process_csv_source(self, source, group_size=100, delimiter=','): - reader = csv.DictReader(source, delimiter=delimiter) - self.process_source(reader, group_size) - - def process_csv_batch(self, source, size=50, delimiter=','): - reader = csv.DictReader(source, delimiter=delimiter) - self.process_batch(reader, size) - - def is_issnl(self, issnl): - return len(issnl) == 9 and issnl[4] == '-' - - def lookup_issnl(self, issnl): - """Caches calls to the ISSN-L lookup API endpoint in a local dict""" - if issnl in self._issnl_id_map: - return self._issnl_id_map[issnl] - container_id = None - try: - rv = self.api.lookup_container(issnl=issnl) - container_id = rv.ident - except ApiException as ae: - # If anything other than a 404 (not found), something is wrong - assert ae.status == 404 - self._issnl_id_map[issnl] = container_id # might be None - return container_id - - def is_orcid(self, orcid): - return self._orcid_regex.match(orcid) is not None - - def lookup_orcid(self, orcid): - """Caches calls to the Orcid lookup API endpoint in a local dict""" - if not self.is_orcid(orcid): - return None - if orcid in self._orcid_id_map: - return self._orcid_id_map[orcid] - creator_id = None - try: - rv = self.api.lookup_creator(orcid=orcid) - creator_id = rv.ident - except ApiException as ae: - # If anything other than a 404 (not found), something is wrong - assert ae.status == 404 - self._orcid_id_map[orcid] = creator_id # might be None - return creator_id - - def is_doi(self, doi): - return doi.startswith("10.") and doi.count("/") >= 1 - - def lookup_doi(self, doi): - """Caches calls to the doi lookup API endpoint in a local dict""" - assert self.is_doi(doi) - doi = doi.lower() - if doi in self._doi_id_map: - return self._doi_id_map[doi] - release_id = None - try: - rv = self.api.lookup_release(doi=doi) - release_id = rv.ident - except ApiException as ae: - # If anything other than a 404 (not found), something is wrong - assert ae.status == 404 - self._doi_id_map[doi] = release_id # might be None - return release_id - - def read_issn_map_file(self, issn_map_file): - print("Loading ISSN map file...") - self._issn_issnl_map = dict() - for line in issn_map_file: - if line.startswith("ISSN") or len(line) == 0: - continue - (issn, issnl) = line.split()[0:2] - self._issn_issnl_map[issn] = issnl - # double mapping makes lookups easy - self._issn_issnl_map[issnl] = issnl - print("Got {} ISSN-L mappings.".format(len(self._issn_issnl_map))) - - def issn2issnl(self, issn): - if issn is None: - return None - return self._issn_issnl_map.get(issn) - diff --git a/python/fatcat_tools/importers/crossref.py b/python/fatcat_tools/importers/crossref.py index 6866cb6f..22abd08d 100644 --- a/python/fatcat_tools/importers/crossref.py +++ b/python/fatcat_tools/importers/crossref.py @@ -359,9 +359,9 @@ class CrossrefImporter(EntityImporter): def try_update(self, re): # lookup existing DOI (don't need to try other ext idents for crossref) - existing_release = None + existing = None try: - existing_release = self.api.lookup_release(doi=re.doi) + existing = self.api.lookup_release(doi=re.doi) except fatcat_client.rest.ApiException as err: if err.status != 404: raise err @@ -370,12 +370,15 @@ class CrossrefImporter(EntityImporter): # eventually we'll want to support "updates", but for now just skip if # entity already exists - if existing_release: + if existing: self.counts['exists'] += 1 return False return True def insert_batch(self, batch): - self.api.create_release_batch(batch, autoaccept=True) + self.api.create_release_batch(batch, + autoaccept=True, + description=self.editgroup_description, + extra=json.dumps(self.editgroup_extra)) diff --git a/python/fatcat_tools/importers/grobid_metadata.py b/python/fatcat_tools/importers/grobid_metadata.py index 5e61a154..c1835b9f 100644 --- a/python/fatcat_tools/importers/grobid_metadata.py +++ b/python/fatcat_tools/importers/grobid_metadata.py @@ -5,12 +5,22 @@ import json import base64 import datetime import fatcat_client -from .common import FatcatImporter +from .common import EntityImporter MAX_ABSTRACT_BYTES=4096 -class GrobidMetadataImporter(FatcatImporter): +class GrobidMetadataImporter(EntityImporter): + """ + This is a complex case: we need to parse and create both file and release entities. + + The "primary" entity here is really File, not Release. If a matching File + exists, we bail in want(); if not we insert the Release during parsing, and + insert both. + + TODO: should instead check if the File has any releases; if not, insert and update. + TODO: relaxing 'None' constraint on parse_record() might make this refactor-able. + """ def __init__(self, api, **kwargs): @@ -23,10 +33,52 @@ class GrobidMetadataImporter(FatcatImporter): editgroup_extra=eg_extra) self.default_link_rel = kwargs.get("default_link_rel", "web") - def parse_grobid_json(self, obj): + def want(self, raw_record): + + fields = raw_record.split('\t') + sha1_key = fields[0] + sha1 = base64.b16encode(base64.b32decode(sha1_key.replace('sha1:', ''))).decode('ascii').lower() + #cdx = json.loads(fields[1]) + #mimetype = fields[2] + #file_size = int(fields[3]) + grobid_meta = json.loads(fields[4]) + + if not grobid_meta.get('title'): + return False + + # lookup existing file SHA1 + try: + existing_file = self.api.lookup_file(sha1=sha1) + except fatcat_client.rest.ApiException as err: + if err.status != 404: + raise err + existing_file = None + + # if file is already in here, presumably not actually long-tail + # TODO: this is where we should check if the file actually has + # release_ids and/or URLs associated with it + if existing_file and not self.bezerk_mode: + return False + return True + + def parse_record(self, row): + + fields = row.split('\t') + sha1_key = fields[0] + cdx = json.loads(fields[1]) + mimetype = fields[2] + file_size = int(fields[3]) + grobid_meta = json.loads(fields[4]) + fe = self.parse_file_metadata(sha1_key, cdx, mimetype, file_size) + re = self.parse_grobid_json(grobid_meta) + assert (fe and re) - if not obj.get('title'): - return None + release_edit = self.create_release(re) + fe.release_ids.append(release_edit.ident) + return fe + + def parse_grobid_json(self, obj): + assert obj.get('title') extra = dict() @@ -122,17 +174,6 @@ class GrobidMetadataImporter(FatcatImporter): sha1 = base64.b16encode(base64.b32decode(sha1_key.replace('sha1:', ''))).decode('ascii').lower() - # lookup existing SHA1, or create new entity - try: - existing_file = self.api.lookup_file(sha1=sha1) - except fatcat_client.rest.ApiException as err: - if err.status != 404: - raise err - existing_file = None - - if existing_file: - # if file is already in here, presumably not actually long-tail - return None fe = fatcat_client.FileEntity( sha1=sha1, size=int(file_size), @@ -143,6 +184,7 @@ class GrobidMetadataImporter(FatcatImporter): # parse URLs and CDX original = cdx['url'] + assert len(cdx['dt']) >= 8 wayback = "https://web.archive.org/web/{}/{}".format( cdx['dt'], original) @@ -154,23 +196,13 @@ class GrobidMetadataImporter(FatcatImporter): return fe - def create_row(self, row, editgroup_id=None): - if not row: - return - fields = row.split('\t') - sha1_key = fields[0] - cdx = json.loads(fields[1]) - mimetype = fields[2] - file_size = int(fields[3]) - grobid_meta = json.loads(fields[4]) - fe = self.parse_file_metadata(sha1_key, cdx, mimetype, file_size) - re = self.parse_grobid_json(grobid_meta) - if fe and re: - release_entity = self.api.create_release(re, editgroup_id=editgroup_id) - # release ident can't already be in release list because we just - # created it - fe.release_ids.append(release_entity.ident) - file_entity = self.api.create_file(fe, editgroup_id=editgroup_id) - self.counts['insert'] += 1 - - # NB: batch mode not implemented + def try_update(entity): + # we did this in want() + return True + + def insert_batch(self, batch): + self.api.create_file_batch(batch, + autoaccept=True, + description=self.editgroup_description, + extra=json.dumps(self.editgroup_extra)) + diff --git a/python/fatcat_tools/importers/journal_metadata.py b/python/fatcat_tools/importers/journal_metadata.py index cd058889..ff38cd77 100644 --- a/python/fatcat_tools/importers/journal_metadata.py +++ b/python/fatcat_tools/importers/journal_metadata.py @@ -3,7 +3,7 @@ import sys import json import itertools import fatcat_client -from .common import FatcatImporter +from .common import EntityImporter def or_none(s): @@ -25,7 +25,7 @@ def truthy(s): else: return None -class JournalMetadataImporter(FatcatImporter): +class JournalMetadataImporter(EntityImporter): """ Imports journal metadata ("containers") by ISSN, currently from a custom (data munged) .csv file format @@ -45,7 +45,12 @@ class JournalMetadataImporter(FatcatImporter): editgroup_description=eg_desc, editgroup_extra=eg_extra) - def parse_journal_metadata_row(self, row): + def want(self, raw_record): + if raw_record.get('ISSN-L'): + return True + return False + + def parse_record(self, row): """ row is a python dict (parsed from CSV). returns a ContainerEntity (or None if invalid or couldn't parse) @@ -72,16 +77,28 @@ class JournalMetadataImporter(FatcatImporter): extra=extra) return ce - def create_row(self, row, editgroup_id=None): - ce = self.parse_journal_metadata_row(row) - if ce is not None: - self.api.create_container(ce, editgroup_id=editgroup_id) - self.counts['insert'] += 1 - - def create_batch(self, batch): - """Reads and processes in batches (not API-call-per-line)""" - objects = [self.parse_journal_metadata_row(l) - for l in batch if (l is not None)] - objects = [o for o in objects if (o is not None)] - self.api.create_container_batch(objects, autoaccept=True) - self.counts['insert'] += len(objects) + def try_update(self, ce): + + existing = None + try: + existing = self.api.lookup_container(issnl=ce.issnl) + except fatcat_client.rest.ApiException as err: + if err.status != 404: + raise err + # doesn't exist, need to update + return True + + # eventually we'll want to support "updates", but for now just skip if + # entity already exists + if existing: + self.counts['exists'] += 1 + return False + + return True + + def insert_batch(self, batch): + self.api.create_container_batch(batch, + autoaccept=True, + description=self.editgroup_description, + extra=json.dumps(self.editgroup_extra)) + diff --git a/python/fatcat_tools/importers/matched.py b/python/fatcat_tools/importers/matched.py index 02c0a9c5..2be15860 100644 --- a/python/fatcat_tools/importers/matched.py +++ b/python/fatcat_tools/importers/matched.py @@ -4,16 +4,10 @@ import json import sqlite3 import itertools import fatcat_client -from .common import FatcatImporter +from .common import EntityImporter -#row = row.split('\t') -#assert len(row) == 2 -#sha1 = row[0].replace('sha1:') -#sha1 = base64.b16encode(base64.b32decode(sha1)).lower() -#print(sha1) -#dois = [d.lower() for d in json.loads(row[1])] -class MatchedImporter(FatcatImporter): +class MatchedImporter(EntityImporter): """ Importer for "file to crossref DOI" matches. @@ -59,26 +53,13 @@ class MatchedImporter(FatcatImporter): rel = "repository" elif "//web.archive.org/" in raw or "//archive.is/" in raw: rel = "webarchive" - return fatcat_client.FileEntityUrls(url=raw, rel=rel) + return (rel, raw) - def parse_matched_dict(self, obj): - sha1 = obj['sha1'] - dois = [d.lower() for d in obj.get('dois', [])] + def want(self, raw_record): + return True - # lookup sha1, or create new entity - fe = None - if not self.skip_file_updates: - try: - fe = self.api.lookup_file(sha1=sha1) - except fatcat_client.rest.ApiException as err: - if err.status != 404: - raise err - if fe is None: - fe = fatcat_client.FileEntity( - sha1=sha1, - release_ids=[], - urls=[], - ) + def parse_record(self, obj): + dois = [d.lower() for d in obj.get('dois', [])] # lookup dois re_list = set() @@ -93,67 +74,78 @@ class MatchedImporter(FatcatImporter): print("DOI not found: {}".format(doi)) else: re_list.add(re.ident) - if len(re_list) == 0: + release_ids = list(re_list) + if len(release_ids) == 0: return None - if fe.release_ids == set(re_list): - return None - re_list.update(fe.release_ids) - fe.release_ids = list(re_list) # parse URLs and CDX - existing_urls = [feu.url for feu in fe.urls] + urls = set() for url in obj.get('url', []): - if url not in existing_urls: - url = self.make_url(url) - if url != None: - fe.urls.append(url) + url = self.make_url(url) + if url != None: + urls.add(url) for cdx in obj.get('cdx', []): original = cdx['url'] wayback = "https://web.archive.org/web/{}/{}".format( cdx['dt'], original) - if wayback not in existing_urls: - fe.urls.append( - fatcat_client.FileEntityUrls(url=wayback, rel="webarchive")) - if original not in existing_urls: - url = self.make_url(original) - if url != None: - fe.urls.append(url) - - if obj.get('size') != None: - fe.size = int(obj['size']) - fe.sha256 = obj.get('sha256', fe.sha256) - fe.md5 = obj.get('md5', fe.sha256) - if obj.get('mimetype') is None: - if fe.mimetype is None: - fe.mimetype = self.default_mime - else: - fe.mimetype = obj.get('mimetype') + urls.add(("webarchive", wayback)) + url = self.make_url(original) + if url != None: + urls.add(url) + urls = [fatcat_client.FileEntityUrls(rel, url) for (rel, url) in urls] + if len(urls) == 0: + return None + + size = obj.get('size') + if size: + size = int(size) + + fe = fatcat_client.FileEntity( + md5=obj.get('md5'), + sha1=obj['sha1'], + sha256=obj.get('sha256'), + size=size, + mimetype=obj.get('mimetype'), + release_ids=release_ids, + urls=urls, + ) return fe - def create_row(self, row, editgroup_id=None): - obj = json.loads(row) - fe = self.parse_matched_dict(obj) - if fe is not None: - if fe.ident is None: - self.api.create_file(fe, editgroup_id=editgroup_id) - self.counts['insert'] += 1 - else: - self.api.update_file(fe.ident, fe, editgroup_id=editgroup_id) - self.counts['update'] += 1 - - def create_batch(self, batch): - """Reads and processes in batches (not API-call-per-line)""" - objects = [self.parse_matched_dict(json.loads(l)) - for l in batch if l != None] - new_objects = [o for o in objects if o != None and o.ident == None] - update_objects = [o for o in objects if o != None and o.ident != None] - if len(update_objects): - update_eg = self._editgroup().editgroup_id - for obj in update_objects: - self.api.update_file(obj.ident, obj, editgroup_id=update_eg) - self.api.accept_editgroup(update_eg) - if len(new_objects) > 0: - self.api.create_file_batch(new_objects, autoaccept=True) - self.counts['update'] += len(update_objects) - self.counts['insert'] += len(new_objects) + def try_update(self, fe): + # lookup sha1, or create new entity + existing = None + if not self.skip_file_updates: + try: + existing = self.api.lookup_file(sha1=fe.sha1) + except fatcat_client.rest.ApiException as err: + if err.status != 404: + raise err + + if not existing: + # new match; go ahead and insert + return True + + fe.release_ids = list(set(fe.release_ids + existing.release_ids)) + if set(fe.release_ids) == set(existing.release_ids) and len(existing.urls) > 0: + # no new release matches *and* there are already existing URLs + self.counts['exists'] += 1 + return False + + # merge the existing into this one and update + existing.urls = list(set(fe.urls + existing.urls)) + existing.release_ids = list(set(fe.release_ids + existing.release_ids)) + existing.mimetype = existing.mimetype or fe.mimetype + existing.size = existing.size or fe.size + existing.md5 = existing.md5 or fe.md5 + existing.sha256 = existing.sha256 or fe.sha256 + self.api.update_file(existing.ident, existing) + self.counts['update'] += 1 + return False + + def insert_batch(self, batch): + self.api.create_file_batch(batch, + autoaccept=True, + description=self.editgroup_description, + extra=json.dumps(self.editgroup_extra)) + diff --git a/python/fatcat_tools/importers/orcid.py b/python/fatcat_tools/importers/orcid.py index d03b2855..2c39db18 100644 --- a/python/fatcat_tools/importers/orcid.py +++ b/python/fatcat_tools/importers/orcid.py @@ -3,7 +3,7 @@ import sys import json import itertools import fatcat_client -from .common import FatcatImporter +from .common import EntityImporter def value_or_none(e): if type(e) == dict: @@ -20,7 +20,7 @@ def value_or_none(e): return None return e -class OrcidImporter(FatcatImporter): +class OrcidImporter(EntityImporter): def __init__(self, api, **kwargs): @@ -32,14 +32,16 @@ class OrcidImporter(FatcatImporter): editgroup_description=eg_desc, editgroup_extra=eg_extra) - def parse_orcid_dict(self, obj): + def want(self, raw_record): + return True + + def parse_record(self, obj): """ obj is a python dict (parsed from json). returns a CreatorEntity """ name = obj['person']['name'] - if name is None: - return None + assert name extra = None given = value_or_none(name.get('given-names')) sur = value_or_none(name.get('family-name')) @@ -67,17 +69,24 @@ class OrcidImporter(FatcatImporter): extra=extra) return ce - def create_row(self, row, editgroup_id=None): - obj = json.loads(row) - ce = self.parse_orcid_dict(obj) - if ce is not None: - self.api.create_creator(ce, editgroup_id=editgroup_id) - self.counts['insert'] += 1 + def try_update(self, raw_record): + existing = None + try: + existing = self.api.lookup_creator(orcid=raw_record.orcid) + except fatcat_client.rest.ApiException as err: + if err.status != 404: + raise err + + # eventually we'll want to support "updates", but for now just skip if + # entity already exists + if existing: + self.counts['exists'] += 1 + return False + + return True - def create_batch(self, batch): - """Reads and processes in batches (not API-call-per-line)""" - objects = [self.parse_orcid_dict(json.loads(l)) - for l in batch if l != None] - objects = [o for o in objects if o != None] - self.api.create_creator_batch(objects, autoaccept=True) - self.counts['insert'] += len(objects) + def insert_batch(self, batch): + self.api.create_creator_batch(batch, + autoaccept=True, + description=self.editgroup_description, + extra=json.dumps(self.editgroup_extra)) diff --git a/python/tests/import_crossref.py b/python/tests/import_crossref.py index 5e0a150f..db49bb7f 100644 --- a/python/tests/import_crossref.py +++ b/python/tests/import_crossref.py @@ -17,13 +17,12 @@ def crossref_importer_existing(api): def test_crossref_importer_batch(crossref_importer): with open('tests/files/crossref-works.2018-01-21.badsample.json', 'r') as f: - pusher = JsonLinePusher(crossref_importer, f) - pusher.run() + JsonLinePusher(crossref_importer, f).run() def test_crossref_importer(crossref_importer): with open('tests/files/crossref-works.2018-01-21.badsample.json', 'r') as f: - pusher = JsonLinePusher(crossref_importer, f) - pusher.run() + crossref_importer.bezerk_mode = True + JsonLinePusher(crossref_importer, f).run() # fetch most recent editgroup changes = crossref_importer.api.get_changelog(limit=1) eg = changes[0].editgroup diff --git a/python/tests/import_grobid_metadata.py b/python/tests/import_grobid_metadata.py index 97ebcaef..f00479d8 100644 --- a/python/tests/import_grobid_metadata.py +++ b/python/tests/import_grobid_metadata.py @@ -3,7 +3,7 @@ import os import json import base64 import pytest -from fatcat_tools.importers import GrobidMetadataImporter +from fatcat_tools.importers import GrobidMetadataImporter, LinePusher from fixtures import api """ @@ -15,10 +15,6 @@ side-effects. Should probably be disabled or re-written. def grobid_metadata_importer(api): yield GrobidMetadataImporter(api) -# TODO: use API to check that entities actually created... -#def test_grobid_metadata_importer_batch(grobid_metadata_importer): -# with open('tests/files/example_grobid_metadata_lines.tsv', 'r') as f: -# grobid_metadata_importer.process_batch(f) def test_grobid_metadata_parse(grobid_metadata_importer): with open('tests/files/example_grobid_metadata_lines.tsv', 'r') as f: @@ -52,9 +48,11 @@ def test_file_metadata_parse(grobid_metadata_importer): assert fe.urls[0].rel == "webarchive" assert len(fe.release_ids) == 0 +# TODO: use API to check that entities actually created... def test_grobid_metadata_importer(grobid_metadata_importer): with open('tests/files/example_grobid_metadata_lines.tsv', 'r') as f: - grobid_metadata_importer.process_source(f) + grobid_metadata_importer.bezerk_mode = True + LinePusher(grobid_metadata_importer, f).run() # fetch most recent editgroup changes = grobid_metadata_importer.api.get_changelog(limit=1) diff --git a/python/tests/import_journal_metadata.py b/python/tests/import_journal_metadata.py index 81334bc6..0263f706 100644 --- a/python/tests/import_journal_metadata.py +++ b/python/tests/import_journal_metadata.py @@ -1,6 +1,6 @@ import pytest -from fatcat_tools.importers import JournalMetadataImporter +from fatcat_tools.importers import JournalMetadataImporter, CsvPusher from fixtures import api @@ -11,11 +11,13 @@ def journal_metadata_importer(api): # TODO: use API to check that entities actually created... def test_journal_metadata_importer_batch(journal_metadata_importer): with open('tests/files/journal_extra_metadata.snip.csv', 'r') as f: - journal_metadata_importer.process_csv_batch(f) + CsvPusher(journal_metadata_importer, f).run() def test_journal_metadata_importer(journal_metadata_importer): with open('tests/files/journal_extra_metadata.snip.csv', 'r') as f: - journal_metadata_importer.process_csv_source(f) + journal_metadata_importer.bezerk_mode = True + journal_metadata_importer.serial_mode = True + CsvPusher(journal_metadata_importer, f).run() # fetch most recent editgroup changes = journal_metadata_importer.api.get_changelog(limit=1) diff --git a/python/tests/import_matched.py b/python/tests/import_matched.py index 080674ac..a58c402f 100644 --- a/python/tests/import_matched.py +++ b/python/tests/import_matched.py @@ -1,7 +1,7 @@ import json import pytest -from fatcat_tools.importers import MatchedImporter +from fatcat_tools.importers import MatchedImporter, JsonLinePusher from fixtures import api @@ -12,11 +12,12 @@ def matched_importer(api): # TODO: use API to check that entities actually created... def test_matched_importer_batch(matched_importer): with open('tests/files/example_matched.json', 'r') as f: - matched_importer.process_batch(f) + JsonLinePusher(matched_importer, f).run() def test_matched_importer(matched_importer): with open('tests/files/example_matched.json', 'r') as f: - matched_importer.process_source(f) + matched_importer.bezerk_mode = True + JsonLinePusher(matched_importer, f).run() # fetch most recent editgroup changes = matched_importer.api.get_changelog(limit=1) @@ -29,7 +30,7 @@ def test_matched_importer(matched_importer): def test_matched_dict_parse(matched_importer): with open('tests/files/example_matched.json', 'r') as f: raw = json.loads(f.readline()) - f = matched_importer.parse_matched_dict(raw) + f = matched_importer.parse_record(raw) assert f.sha1 == "00242a192acc258bdfdb151943419437f440c313" assert f.md5 == "f4de91152c7ab9fdc2a128f962faebff" assert f.mimetype == "application/pdf" diff --git a/python/tests/import_orcid.py b/python/tests/import_orcid.py index 717a1328..9e898521 100644 --- a/python/tests/import_orcid.py +++ b/python/tests/import_orcid.py @@ -1,7 +1,7 @@ import json import pytest -from fatcat_tools.importers import OrcidImporter +from fatcat_tools.importers import OrcidImporter, JsonLinePusher from fixtures import api @@ -9,18 +9,16 @@ from fixtures import api def orcid_importer(api): yield OrcidImporter(api) -# TODO: use API to check that entities actually created... -def test_orcid_importer_batch(orcid_importer): - with open('tests/files/0000-0001-8254-7103.json', 'r') as f: - orcid_importer.process_batch(f) - def test_orcid_importer_badid(orcid_importer): with open('tests/files/0000-0001-8254-710X.json', 'r') as f: - orcid_importer.process_batch(f) + pusher = JsonLinePusher(orcid_importer, f) + pusher.run() +# TODO: use API to check that entities actually created... def test_orcid_importer(orcid_importer): with open('tests/files/0000-0001-8254-7103.json', 'r') as f: - orcid_importer.process_source(f) + orcid_importer.bezerk_mode = True + JsonLinePusher(orcid_importer, f).run() # fetch most recent editgroup changes = orcid_importer.api.get_changelog(limit=1) @@ -32,14 +30,15 @@ def test_orcid_importer(orcid_importer): def test_orcid_importer_x(orcid_importer): with open('tests/files/0000-0003-3953-765X.json', 'r') as f: - orcid_importer.process_source(f) + pusher = JsonLinePusher(orcid_importer, f) + pusher.run() c = orcid_importer.api.lookup_creator(orcid="0000-0003-3953-765X") assert c is not None def test_orcid_dict_parse(orcid_importer): with open('tests/files/0000-0001-8254-7103.json', 'r') as f: raw = json.loads(f.readline()) - c = orcid_importer.parse_orcid_dict(raw) + c = orcid_importer.parse_record(raw) assert c.given_name == "Man-Hui" assert c.surname == "Li" assert c.display_name == "Man-Hui Li" diff --git a/python/tests/transform_tests.py b/python/tests/transform_tests.py index e9d23250..6d6c6c82 100644 --- a/python/tests/transform_tests.py +++ b/python/tests/transform_tests.py @@ -11,7 +11,7 @@ def test_elasticsearch_convert(crossref_importer): with open('tests/files/crossref-works.single.json', 'r') as f: # not a single line raw = json.loads(f.read()) - (r, c) = crossref_importer.parse_crossref_dict(raw) + r = crossref_importer.parse_record(raw) r.state = 'active' release_to_elasticsearch(r) -- cgit v1.2.3