diff options
Diffstat (limited to 'python/fatcat_tools/importers')
| -rw-r--r-- | python/fatcat_tools/importers/__init__.py | 2 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/common.py | 264 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/crossref.py | 11 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/grobid_metadata.py | 104 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/journal_metadata.py | 49 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/matched.py | 150 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/orcid.py | 45 | 
7 files changed, 297 insertions, 328 deletions
| diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 7b20868c..b709f714 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -12,7 +12,7 @@ To run an import you combine two classes; one each of:  """ -from .common import FatcatImporter, JsonLinePusher, make_kafka_consumer +from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, KafkaJsonPusher, make_kafka_consumer  from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP  from .grobid_metadata import GrobidMetadataImporter  from .journal_metadata import JournalMetadataImporter diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 604aa78b..e7fe2305 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -49,9 +49,11 @@ class EntityImporter:          self.api = api          self.bezerk_mode = kwargs.get('bezerk_mode', False) -        self._editgroup_description = kwargs.get('editgroup_description') -        self._editgroup_extra = kwargs.get('editgroup_extra') -        self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0, 'sub.container': 0}) +        self.serial_mode = kwargs.get('serial_mode', False) +        self.edit_batch_size = kwargs.get('edit_batch_size', 100) +        self.editgroup_description = kwargs.get('editgroup_description') +        self.editgroup_extra = kwargs.get('editgroup_extra') +        self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0})          self._edit_count = 0          self._editgroup_id = None          self._entity_queue = [] @@ -69,7 +71,9 @@ class EntityImporter:              self.counts['skip'] += 1              return          entity = self.parse_record(raw_record) -        assert entity +        if not entity: +            self.counts['skip'] += 1 +            return          if self.bezerk_mode:              self.push_entity(entity)              return @@ -85,8 +89,8 @@ class EntityImporter:          if self._entity_queue:              self.insert_batch(self._entity_queue) -            self.counts['insert'] += len(_entity_queue) -            self._entity_queue = 0 +            self.counts['insert'] += len(self._entity_queue) +            self._entity_queue =  []          self.counts['total'] = 0          for key in ('skip', 'insert', 'update', 'exists'): @@ -101,17 +105,28 @@ class EntityImporter:          if not self._editgroup_id:              eg = self.api.create_editgroup( -                description=self._editgroup_description, -                extra=self._editgroup_extra) +                fatcat_client.Editgroup( +                    description=self.editgroup_description, +                    extra=self.editgroup_extra))              self._editgroup_id = eg.editgroup_id          self._edit_count += edits          return self._editgroup_id      def create_container(self, entity): -        eg = self._get_editgroup() -        self.counts['sub.container'] += 1 -        return self.api.create_container(entity, editgroup_id=eg.editgroup_id) +        eg_id = self._get_editgroup() +        self.counts['inserted.container'] += 1 +        return self.api.create_container(entity, editgroup_id=eg_id) + +    def create_release(self, entity): +        eg_id = self._get_editgroup() +        self.counts['inserted.release'] += 1 +        return self.api.create_release(entity, editgroup_id=eg_id) + +    def create_file(self, entity): +        eg_id = self._get_editgroup() +        self.counts['inserted.file'] += 1 +        return self.api.create_file(entity, editgroup_id=eg_id)      def updated(self):          """ @@ -135,10 +150,10 @@ class EntityImporter:      def parse(self, raw_record):          """ -        Returns an entity class type. expected to always succeed, or raise an -        exception (`want()` should be used to filter out bad records). May have -        side-effects (eg, create related entities), but shouldn't update/mutate -        the actual entity. +        Returns an entity class type, or None if we should skip this one. + +        May have side-effects (eg, create related entities), but shouldn't +        update/mutate the actual entity.          """          raise NotImplementedError @@ -148,9 +163,10 @@ class EntityImporter:          update it (PUT), decide we should do nothing (based on the existing          record), or create a new one. -        Implementations should update exists/updated counts appropriately. +        Implementations must update the exists/updated/skip counts +        appropriately in this method. -        Returns boolean: True if  +        Returns boolean: True if the entity should still be inserted, False otherwise          """          raise NotImplementedError @@ -230,7 +246,6 @@ class EntityImporter:          return self._issn_issnl_map.get(issn) -  class RecordPusher:      """      Base class for different importer sources. Pretty trivial interface, just @@ -252,14 +267,14 @@ class RecordPusher:          raise NotImplementedError -class JsonLinePusher: +class JsonLinePusher(RecordPusher): -    def __init__(self, importer, in_file, **kwargs): +    def __init__(self, importer, json_file, **kwargs):          self.importer = importer -        self.in_file = in_file +        self.json_file = json_file      def run(self): -        for line in self.in_file: +        for line in self.json_file:              if not line:                  continue              record = json.loads(line) @@ -267,11 +282,59 @@ class JsonLinePusher:          print(self.importer.finish()) -# from: https://docs.python.org/3/library/itertools.html -def grouper(iterable, n, fillvalue=None): -    "Collect data into fixed-length chunks or blocks" -    args = [iter(iterable)] * n -    return itertools.zip_longest(*args, fillvalue=fillvalue) +class CsvPusher(RecordPusher): + +    def __init__(self, importer, csv_file, **kwargs): +        self.importer = importer +        self.reader = csv.DictReader(csv_file, delimiter=kwargs.get('delimiter', ',')) + +    def run(self): +        for line in self.reader: +            if not line: +                continue +            self.importer.push_record(line) +        print(self.importer.finish()) + + +class LinePusher(RecordPusher): + +    def __init__(self, importer, text_file, **kwargs): +        self.importer = importer +        self.text_file = text_file + +    def run(self): +        for line in self.text_file: +            if not line: +                continue +            self.importer.push_record(line) +        print(self.importer.finish()) + + +class KafkaJsonPusher(RecordPusher): + +    def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs): +        self.importer = importer +        self.consumer = make_kafka_consumer( +            kafka_hosts, +            kafka_env, +            topic_suffix, +            group, +        ) + +    def run(self): +        count = 0 +        for msg in self.consumer: +            if not msg: +                continue +            record = json.loads(msg.value.decode('utf-8')) +            self.importer.push_record(record) +            count += 1 +            if count % 500 == 0: +                print("Import counts: {}".format(self.importer.counts)) +        # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or +        # commit the current batch if it has been lingering +        print(self.importer.finish()) +  def make_kafka_consumer(hosts, env, topic_suffix, group):      topic_name = "fatcat-{}.{}".format(env, topic_suffix).encode('utf-8') @@ -288,150 +351,3 @@ def make_kafka_consumer(hosts, env, topic_suffix, group):      )      return consumer -class FatcatImporter: -    """ -    Base class for fatcat importers -    """ - -    def __init__(self, api, **kwargs): - -        eg_extra = kwargs.get('editgroup_extra', dict()) -        eg_extra['git_rev'] = eg_extra.get('git_rev', -            subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') -        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.FatcatImporter') -         -        self.api = api -        self._editgroup_description = kwargs.get('editgroup_description') -        self._editgroup_extra = kwargs.get('editgroup_extra') -        issn_map_file = kwargs.get('issn_map_file') - -        self._issnl_id_map = dict() -        self._orcid_id_map = dict() -        self._doi_id_map = dict() -        if issn_map_file: -            self.read_issn_map_file(issn_map_file) -        self._orcid_regex = re.compile("^\\d{4}-\\d{4}-\\d{4}-\\d{3}[\\dX]$") -        self.counts = Counter({'insert': 0, 'update': 0, 'processed_lines': 0}) - -    def _editgroup(self): -        eg = fatcat_client.Editgroup( -            description=self._editgroup_description, -            extra=self._editgroup_extra, -        ) -        return self.api.create_editgroup(eg) - -    def describe_run(self): -        print("Processed {} lines, inserted {}, updated {}.".format( -            self.counts['processed_lines'], self.counts['insert'], self.counts['update'])) - -    def create_row(self, row, editgroup_id=None): -        # sub-classes expected to implement this -        raise NotImplementedError - -    def create_batch(self, rows, editgroup_id=None): -        # sub-classes expected to implement this -        raise NotImplementedError - -    def process_source(self, source, group_size=100): -        """Creates and auto-accepts editgroup every group_size rows""" -        eg = self._editgroup() -        i = 0 -        for i, row in enumerate(source): -            self.create_row(row, editgroup_id=eg.editgroup_id) -            if i > 0 and (i % group_size) == 0: -                self.api.accept_editgroup(eg.editgroup_id) -                eg = self._editgroup() -            self.counts['processed_lines'] += 1 -        if i == 0 or (i % group_size) != 0: -            self.api.accept_editgroup(eg.editgroup_id) - -    def process_batch(self, source, size=50, decode_kafka=False): -        """Reads and processes in batches (not API-call-per-)""" -        for rows in grouper(source, size): -            if decode_kafka: -                rows = [msg.value.decode('utf-8') for msg in rows] -            self.counts['processed_lines'] += len(rows) -            #eg = self._editgroup() -            #self.create_batch(rows, editgroup_id=eg.editgroup_id) -            self.create_batch(rows) - -    def process_csv_source(self, source, group_size=100, delimiter=','): -        reader = csv.DictReader(source, delimiter=delimiter) -        self.process_source(reader, group_size) - -    def process_csv_batch(self, source, size=50, delimiter=','): -        reader = csv.DictReader(source, delimiter=delimiter) -        self.process_batch(reader, size) - -    def is_issnl(self, issnl): -        return len(issnl) == 9 and issnl[4] == '-' - -    def lookup_issnl(self, issnl): -        """Caches calls to the ISSN-L lookup API endpoint in a local dict""" -        if issnl in self._issnl_id_map: -            return self._issnl_id_map[issnl] -        container_id = None -        try: -            rv = self.api.lookup_container(issnl=issnl) -            container_id = rv.ident -        except ApiException as ae: -            # If anything other than a 404 (not found), something is wrong -            assert ae.status == 404 -        self._issnl_id_map[issnl] = container_id # might be None -        return container_id - -    def is_orcid(self, orcid): -        return self._orcid_regex.match(orcid) is not None - -    def lookup_orcid(self, orcid): -        """Caches calls to the Orcid lookup API endpoint in a local dict""" -        if not self.is_orcid(orcid): -            return None -        if orcid in self._orcid_id_map: -            return self._orcid_id_map[orcid] -        creator_id = None -        try: -            rv = self.api.lookup_creator(orcid=orcid) -            creator_id = rv.ident -        except ApiException as ae: -            # If anything other than a 404 (not found), something is wrong -            assert ae.status == 404 -        self._orcid_id_map[orcid] = creator_id # might be None -        return creator_id - -    def is_doi(self, doi): -        return doi.startswith("10.") and doi.count("/") >= 1 - -    def lookup_doi(self, doi): -        """Caches calls to the doi lookup API endpoint in a local dict""" -        assert self.is_doi(doi) -        doi = doi.lower() -        if doi in self._doi_id_map: -            return self._doi_id_map[doi] -        release_id = None -        try: -            rv = self.api.lookup_release(doi=doi) -            release_id = rv.ident -        except ApiException as ae: -            # If anything other than a 404 (not found), something is wrong -            assert ae.status == 404 -        self._doi_id_map[doi] = release_id # might be None -        return release_id - -    def read_issn_map_file(self, issn_map_file): -        print("Loading ISSN map file...") -        self._issn_issnl_map = dict() -        for line in issn_map_file: -            if line.startswith("ISSN") or len(line) == 0: -                continue -            (issn, issnl) = line.split()[0:2] -            self._issn_issnl_map[issn] = issnl -            # double mapping makes lookups easy -            self._issn_issnl_map[issnl] = issnl -        print("Got {} ISSN-L mappings.".format(len(self._issn_issnl_map))) - -    def issn2issnl(self, issn): -        if issn is None: -            return None -        return self._issn_issnl_map.get(issn) - diff --git a/python/fatcat_tools/importers/crossref.py b/python/fatcat_tools/importers/crossref.py index 6866cb6f..22abd08d 100644 --- a/python/fatcat_tools/importers/crossref.py +++ b/python/fatcat_tools/importers/crossref.py @@ -359,9 +359,9 @@ class CrossrefImporter(EntityImporter):      def try_update(self, re):          # lookup existing DOI (don't need to try other ext idents for crossref) -        existing_release = None +        existing = None          try: -            existing_release = self.api.lookup_release(doi=re.doi) +            existing = self.api.lookup_release(doi=re.doi)          except fatcat_client.rest.ApiException as err:              if err.status != 404:                  raise err @@ -370,12 +370,15 @@ class CrossrefImporter(EntityImporter):          # eventually we'll want to support "updates", but for now just skip if          # entity already exists -        if existing_release: +        if existing:              self.counts['exists'] += 1              return False          return True      def insert_batch(self, batch): -        self.api.create_release_batch(batch, autoaccept=True) +        self.api.create_release_batch(batch, +            autoaccept=True, +            description=self.editgroup_description, +            extra=json.dumps(self.editgroup_extra)) diff --git a/python/fatcat_tools/importers/grobid_metadata.py b/python/fatcat_tools/importers/grobid_metadata.py index 5e61a154..c1835b9f 100644 --- a/python/fatcat_tools/importers/grobid_metadata.py +++ b/python/fatcat_tools/importers/grobid_metadata.py @@ -5,12 +5,22 @@ import json  import base64  import datetime  import fatcat_client -from .common import FatcatImporter +from .common import EntityImporter  MAX_ABSTRACT_BYTES=4096 -class GrobidMetadataImporter(FatcatImporter): +class GrobidMetadataImporter(EntityImporter): +    """ +    This is a complex case: we need to parse and create both file and release entities. + +    The "primary" entity here is really File, not Release. If a matching File +    exists, we bail in want(); if not we insert the Release during parsing, and +    insert both. + +    TODO: should instead check if the File has any releases; if not, insert and update. +    TODO: relaxing 'None' constraint on parse_record() might make this refactor-able. +    """      def __init__(self, api, **kwargs): @@ -23,10 +33,52 @@ class GrobidMetadataImporter(FatcatImporter):              editgroup_extra=eg_extra)          self.default_link_rel = kwargs.get("default_link_rel", "web") -    def parse_grobid_json(self, obj): +    def want(self, raw_record): + +        fields = raw_record.split('\t') +        sha1_key = fields[0] +        sha1 = base64.b16encode(base64.b32decode(sha1_key.replace('sha1:', ''))).decode('ascii').lower() +        #cdx = json.loads(fields[1]) +        #mimetype = fields[2] +        #file_size = int(fields[3]) +        grobid_meta = json.loads(fields[4]) + +        if not grobid_meta.get('title'): +            return False + +        # lookup existing file SHA1 +        try: +            existing_file = self.api.lookup_file(sha1=sha1) +        except fatcat_client.rest.ApiException as err: +            if err.status != 404: +                raise err +            existing_file = None + +        # if file is already in here, presumably not actually long-tail +        # TODO: this is where we should check if the file actually has +        # release_ids and/or URLs associated with it +        if existing_file and not self.bezerk_mode: +            return False +        return True + +    def parse_record(self, row): + +        fields = row.split('\t') +        sha1_key = fields[0] +        cdx = json.loads(fields[1]) +        mimetype = fields[2] +        file_size = int(fields[3]) +        grobid_meta = json.loads(fields[4]) +        fe = self.parse_file_metadata(sha1_key, cdx, mimetype, file_size) +        re = self.parse_grobid_json(grobid_meta) +        assert (fe and re) -        if not obj.get('title'): -            return None +        release_edit = self.create_release(re) +        fe.release_ids.append(release_edit.ident) +        return fe + +    def parse_grobid_json(self, obj): +        assert obj.get('title')          extra = dict() @@ -122,17 +174,6 @@ class GrobidMetadataImporter(FatcatImporter):          sha1 = base64.b16encode(base64.b32decode(sha1_key.replace('sha1:', ''))).decode('ascii').lower() -        # lookup existing SHA1, or create new entity -        try: -            existing_file = self.api.lookup_file(sha1=sha1) -        except fatcat_client.rest.ApiException as err: -            if err.status != 404: -                raise err -            existing_file = None - -        if existing_file: -            # if file is already in here, presumably not actually long-tail -            return None          fe = fatcat_client.FileEntity(              sha1=sha1,              size=int(file_size), @@ -143,6 +184,7 @@ class GrobidMetadataImporter(FatcatImporter):          # parse URLs and CDX          original = cdx['url'] +        assert len(cdx['dt']) >= 8          wayback = "https://web.archive.org/web/{}/{}".format(              cdx['dt'],              original) @@ -154,23 +196,13 @@ class GrobidMetadataImporter(FatcatImporter):          return fe -    def create_row(self, row, editgroup_id=None): -        if not row: -            return -        fields = row.split('\t') -        sha1_key = fields[0] -        cdx = json.loads(fields[1]) -        mimetype = fields[2] -        file_size = int(fields[3]) -        grobid_meta = json.loads(fields[4]) -        fe = self.parse_file_metadata(sha1_key, cdx, mimetype, file_size) -        re = self.parse_grobid_json(grobid_meta) -        if fe and re: -            release_entity = self.api.create_release(re, editgroup_id=editgroup_id) -            # release ident can't already be in release list because we just -            # created it -            fe.release_ids.append(release_entity.ident) -            file_entity = self.api.create_file(fe, editgroup_id=editgroup_id) -            self.counts['insert'] += 1 - -    # NB: batch mode not implemented +    def try_update(entity): +        # we did this in want() +        return True + +    def insert_batch(self, batch): +        self.api.create_file_batch(batch, +            autoaccept=True, +            description=self.editgroup_description, +            extra=json.dumps(self.editgroup_extra)) + diff --git a/python/fatcat_tools/importers/journal_metadata.py b/python/fatcat_tools/importers/journal_metadata.py index cd058889..ff38cd77 100644 --- a/python/fatcat_tools/importers/journal_metadata.py +++ b/python/fatcat_tools/importers/journal_metadata.py @@ -3,7 +3,7 @@ import sys  import json  import itertools  import fatcat_client -from .common import FatcatImporter +from .common import EntityImporter  def or_none(s): @@ -25,7 +25,7 @@ def truthy(s):      else:          return None -class JournalMetadataImporter(FatcatImporter): +class JournalMetadataImporter(EntityImporter):      """      Imports journal metadata ("containers") by ISSN, currently from a custom      (data munged) .csv file format @@ -45,7 +45,12 @@ class JournalMetadataImporter(FatcatImporter):              editgroup_description=eg_desc,              editgroup_extra=eg_extra) -    def parse_journal_metadata_row(self, row): +    def want(self, raw_record): +        if raw_record.get('ISSN-L'): +            return True +        return False + +    def parse_record(self, row):          """          row is a python dict (parsed from CSV).          returns a ContainerEntity (or None if invalid or couldn't parse) @@ -72,16 +77,28 @@ class JournalMetadataImporter(FatcatImporter):              extra=extra)          return ce -    def create_row(self, row, editgroup_id=None): -        ce = self.parse_journal_metadata_row(row) -        if ce is not None: -            self.api.create_container(ce, editgroup_id=editgroup_id) -            self.counts['insert'] += 1 - -    def create_batch(self, batch): -        """Reads and processes in batches (not API-call-per-line)""" -        objects = [self.parse_journal_metadata_row(l) -                   for l in batch if (l is not None)] -        objects = [o for o in objects if (o is not None)] -        self.api.create_container_batch(objects, autoaccept=True) -        self.counts['insert'] += len(objects) +    def try_update(self, ce): + +        existing = None +        try: +            existing = self.api.lookup_container(issnl=ce.issnl) +        except fatcat_client.rest.ApiException as err: +            if err.status != 404: +                raise err +            # doesn't exist, need to update +            return True + +        # eventually we'll want to support "updates", but for now just skip if +        # entity already exists +        if existing: +            self.counts['exists'] += 1 +            return False +         +        return True + +    def insert_batch(self, batch): +        self.api.create_container_batch(batch, +            autoaccept=True, +            description=self.editgroup_description, +            extra=json.dumps(self.editgroup_extra)) + diff --git a/python/fatcat_tools/importers/matched.py b/python/fatcat_tools/importers/matched.py index 02c0a9c5..2be15860 100644 --- a/python/fatcat_tools/importers/matched.py +++ b/python/fatcat_tools/importers/matched.py @@ -4,16 +4,10 @@ import json  import sqlite3  import itertools  import fatcat_client -from .common import FatcatImporter +from .common import EntityImporter -#row = row.split('\t') -#assert len(row) == 2 -#sha1 = row[0].replace('sha1:') -#sha1 = base64.b16encode(base64.b32decode(sha1)).lower() -#print(sha1) -#dois = [d.lower() for d in json.loads(row[1])] -class MatchedImporter(FatcatImporter): +class MatchedImporter(EntityImporter):      """      Importer for "file to crossref DOI" matches. @@ -59,26 +53,13 @@ class MatchedImporter(FatcatImporter):              rel = "repository"          elif "//web.archive.org/" in raw or "//archive.is/" in raw:              rel = "webarchive" -        return fatcat_client.FileEntityUrls(url=raw, rel=rel) +        return (rel, raw) -    def parse_matched_dict(self, obj): -        sha1 = obj['sha1'] -        dois = [d.lower() for d in obj.get('dois', [])] +    def want(self, raw_record): +        return True -        # lookup sha1, or create new entity -        fe = None -        if not self.skip_file_updates: -            try: -                fe = self.api.lookup_file(sha1=sha1) -            except fatcat_client.rest.ApiException as err: -                if err.status != 404: -                    raise err -        if fe is None: -            fe = fatcat_client.FileEntity( -                sha1=sha1, -                release_ids=[], -                urls=[], -            ) +    def parse_record(self, obj): +        dois = [d.lower() for d in obj.get('dois', [])]          # lookup dois          re_list = set() @@ -93,67 +74,78 @@ class MatchedImporter(FatcatImporter):                  print("DOI not found: {}".format(doi))              else:                  re_list.add(re.ident) -        if len(re_list) == 0: +        release_ids = list(re_list) +        if len(release_ids) == 0:              return None -        if fe.release_ids == set(re_list): -            return None -        re_list.update(fe.release_ids) -        fe.release_ids = list(re_list)          # parse URLs and CDX -        existing_urls = [feu.url for feu in fe.urls] +        urls = set()          for url in obj.get('url', []): -            if url not in existing_urls: -                url = self.make_url(url) -                if url != None: -                    fe.urls.append(url) +            url = self.make_url(url) +            if url != None: +                urls.add(url)          for cdx in obj.get('cdx', []):              original = cdx['url']              wayback = "https://web.archive.org/web/{}/{}".format(                  cdx['dt'],                  original) -            if wayback not in existing_urls: -                fe.urls.append( -                    fatcat_client.FileEntityUrls(url=wayback, rel="webarchive")) -            if original not in existing_urls: -                url = self.make_url(original) -                if url != None: -                    fe.urls.append(url) - -        if obj.get('size') != None: -            fe.size = int(obj['size']) -        fe.sha256 = obj.get('sha256', fe.sha256) -        fe.md5 = obj.get('md5', fe.sha256) -        if obj.get('mimetype') is None: -            if fe.mimetype is None: -                fe.mimetype = self.default_mime -        else: -            fe.mimetype = obj.get('mimetype') +            urls.add(("webarchive", wayback)) +            url = self.make_url(original) +            if url != None: +                urls.add(url) +        urls = [fatcat_client.FileEntityUrls(rel, url) for (rel, url) in urls] +        if len(urls) == 0: +            return None + +        size = obj.get('size') +        if size: +            size = int(size) + +        fe = fatcat_client.FileEntity( +            md5=obj.get('md5'), +            sha1=obj['sha1'], +            sha256=obj.get('sha256'), +            size=size, +            mimetype=obj.get('mimetype'), +            release_ids=release_ids, +            urls=urls, +        )          return fe -    def create_row(self, row, editgroup_id=None): -        obj = json.loads(row) -        fe = self.parse_matched_dict(obj) -        if fe is not None: -            if fe.ident is None: -                self.api.create_file(fe, editgroup_id=editgroup_id) -                self.counts['insert'] += 1 -            else: -                self.api.update_file(fe.ident, fe, editgroup_id=editgroup_id) -                self.counts['update'] += 1 - -    def create_batch(self, batch): -        """Reads and processes in batches (not API-call-per-line)""" -        objects = [self.parse_matched_dict(json.loads(l)) -                   for l in batch if l != None] -        new_objects = [o for o in objects if o != None and o.ident == None] -        update_objects = [o for o in objects if o != None and o.ident != None] -        if len(update_objects): -            update_eg = self._editgroup().editgroup_id -            for obj in update_objects: -                self.api.update_file(obj.ident, obj, editgroup_id=update_eg) -            self.api.accept_editgroup(update_eg) -        if len(new_objects) > 0: -            self.api.create_file_batch(new_objects, autoaccept=True) -        self.counts['update'] += len(update_objects) -        self.counts['insert'] += len(new_objects) +    def try_update(self, fe): +        # lookup sha1, or create new entity +        existing = None +        if not self.skip_file_updates: +            try: +                existing = self.api.lookup_file(sha1=fe.sha1) +            except fatcat_client.rest.ApiException as err: +                if err.status != 404: +                    raise err + +        if not existing: +            # new match; go ahead and insert +            return True + +        fe.release_ids = list(set(fe.release_ids + existing.release_ids)) +        if set(fe.release_ids) == set(existing.release_ids) and len(existing.urls) > 0: +            # no new release matches *and* there are already existing URLs +            self.counts['exists'] += 1 +            return False + +        # merge the existing into this one and update +        existing.urls = list(set(fe.urls + existing.urls)) +        existing.release_ids = list(set(fe.release_ids + existing.release_ids)) +        existing.mimetype = existing.mimetype or fe.mimetype +        existing.size = existing.size or fe.size +        existing.md5 = existing.md5 or fe.md5 +        existing.sha256 = existing.sha256 or fe.sha256 +        self.api.update_file(existing.ident, existing) +        self.counts['update'] += 1 +        return False + +    def insert_batch(self, batch): +        self.api.create_file_batch(batch, +            autoaccept=True, +            description=self.editgroup_description, +            extra=json.dumps(self.editgroup_extra)) + diff --git a/python/fatcat_tools/importers/orcid.py b/python/fatcat_tools/importers/orcid.py index d03b2855..2c39db18 100644 --- a/python/fatcat_tools/importers/orcid.py +++ b/python/fatcat_tools/importers/orcid.py @@ -3,7 +3,7 @@ import sys  import json  import itertools  import fatcat_client -from .common import FatcatImporter +from .common import EntityImporter  def value_or_none(e):      if type(e) == dict: @@ -20,7 +20,7 @@ def value_or_none(e):              return None      return e -class OrcidImporter(FatcatImporter): +class OrcidImporter(EntityImporter):      def __init__(self, api, **kwargs): @@ -32,14 +32,16 @@ class OrcidImporter(FatcatImporter):              editgroup_description=eg_desc,              editgroup_extra=eg_extra) -    def parse_orcid_dict(self, obj): +    def want(self, raw_record): +        return True + +    def parse_record(self, obj):          """          obj is a python dict (parsed from json).          returns a CreatorEntity          """          name = obj['person']['name'] -        if name is None: -            return None +        assert name          extra = None          given = value_or_none(name.get('given-names'))          sur = value_or_none(name.get('family-name')) @@ -67,17 +69,24 @@ class OrcidImporter(FatcatImporter):              extra=extra)          return ce -    def create_row(self, row, editgroup_id=None): -        obj = json.loads(row) -        ce = self.parse_orcid_dict(obj) -        if ce is not None: -            self.api.create_creator(ce, editgroup_id=editgroup_id) -            self.counts['insert'] += 1 +    def try_update(self, raw_record): +        existing = None +        try: +            existing = self.api.lookup_creator(orcid=raw_record.orcid) +        except fatcat_client.rest.ApiException as err: +            if err.status != 404: +                raise err + +        # eventually we'll want to support "updates", but for now just skip if +        # entity already exists +        if existing: +            self.counts['exists'] += 1 +            return False +         +        return True -    def create_batch(self, batch): -        """Reads and processes in batches (not API-call-per-line)""" -        objects = [self.parse_orcid_dict(json.loads(l)) -                   for l in batch if l != None] -        objects = [o for o in objects if o != None] -        self.api.create_creator_batch(objects, autoaccept=True) -        self.counts['insert'] += len(objects) +    def insert_batch(self, batch): +        self.api.create_creator_batch(batch, +            autoaccept=True, +            description=self.editgroup_description, +            extra=json.dumps(self.editgroup_extra)) | 
