diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/fatcat_tools/importers/__init__.py | 15 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/common.py | 166 | 
2 files changed, 181 insertions, 0 deletions
| diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 47fc1fd3..f2caca5c 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -1,7 +1,22 @@ +""" +To run an import you combine two classes; one each of: + +- RecordSource: somehow iterates over a source of raw records (eg, from a +  database, Kafka, files on disk, stdin) and pushes into an entity importer. +- EntityImporter: class that a record iterator pushes raw (unparsed) records +  into. The entity importer parses and decides what to do (ignore, update, +  insert, etc). There is usually a primary entity type, though related entities +  can be created along the way. Maintains API connection and editgroup/batch +  state. + +""" +  from .common import FatcatImporter, make_kafka_consumer  from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP  from .grobid_metadata import GrobidMetadataImporter  from .journal_metadata import JournalMetadataImporter  from .matched import MatchedImporter  from .orcid import OrcidImporter +from .kafka_source import KafkaSource +from .file_source import FileSource diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 06897bee..25ee4727 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -12,6 +12,171 @@ import fatcat_client  from fatcat_client.rest import ApiException +class EntityImporter: +    """ +    Base class for fatcat entity importers. +     +    The API exposed to record iterator is: + +        push_record(raw_record) +        finish() + +    The API that implementations are expected to fill in are: + +        want(raw_record) -> boolean +        parse(raw_record) -> entity +        try_update(entity) -> boolean +        insert_batch([entity]) -> None + +    This class exposes helpers for implementations: + +        self.api +        self.create_related_*(entity) for all entity types +        self.push_entity(entity) +        self.counts['exits'] += 1 (if didn't update or insert because of existing) +        self.counts['update'] += 1 (if updated an entity) +    """ + +    def __init__(self, api, **kwargs): + +        eg_extra = kwargs.get('editgroup_extra', dict()) +        eg_extra['git_rev'] = eg_extra.get('git_rev', +            subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') +        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityImporter') +         +        self.api = api +        self.bezerk_mode = kwargs.get('bezerk_mode', False) +        self._editgroup_description = kwargs.get('editgroup_description') +        self._editgroup_extra = kwargs.get('editgroup_extra') +        self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0, 'sub.container': 0}) +        self._edit_count = 0 +        self._editgroup_id = None +        self._entity_queue = [] + +    def push_record(self, raw_record): +        """ +        Returns nothing. +        """ +        if (not raw_record) or (not self.want(raw_record): +            self.counts['skip'] += 1 +            return +        entity = self.parse_record(raw_record) +        if self.bezerk_mode: +            self.push_entity(entity) +            return +        if self.try_update(entity): +            self.push_entity(entity) +        return + +    def finish(self, raw_record): +        if self._edit_count > 0: +            self.api.accept_editgroup(self._editgroup_id) +            self._editgroup_id = None +            self._edit_count = 0 + +        if self._entity_queue: +            self.insert_batch(self._entity_queue) +            self.counts['insert'] += len(_entity_queue) +            self._entity_queue = 0 + +        self.counts['total'] = counts['skip'] + counts['insert'] + \ +            counts['update'] + counts['exists'] +        return self.counts + +    def _get_editgroup(self, edits=1): +        if self._edit_count >= self.edit_batch_size: +            self.api.accept_editgroup(self._editgroup_id) +            self._editgroup_id = None +            self._edit_count = 0 + +        if not self._editgroup_id: +            eg = self.api.create_editgroup( +                description=self._editgroup_description, +                extra=self._editgroup_extra) +            self._editgroup_id = eg.editgroup_id + +        self._edit_count += edits +        return self._editgroup_id + +    def create_container(self, entity): +        eg = self._get_editgroup() +        self.api.create_container(entity, editgroup_id=eg.editgroup_id) +        self.counts['sub.container'] += 1 + +    def updated(self): +        """ +        Implementations should call this from try_update() if the update was successful +        """ +        self.counts['update'] += 1 + +    def push_entity(self, entity): +        self._entity_queue.append(entity) +        if len(self._entity_queue) >= self.edit_batch_size: +            self.insert_batch(self._entity_queue) +            self.counts['insert'] += len(_entity_queue) +            self._entity_queue = 0 + +    def want(self, raw_record): +        """ +        Implementations can override for optional fast-path to drop a record. +        Must have no side-effects; returns bool. +        """ +        return True + +    def parse(self, raw_record): +        """ +        Returns an entity class type. expected to always succeed, or raise an +        exception (`want()` should be used to filter out bad records). May have +        side-effects (eg, create related entities), but shouldn't update/mutate +        the actual entity. +        """ +        raise NotImplementedError + +    def try_update(self, raw_record): +        """ +        Passed the output of parse(). Should try to find an existing entity and +        update it (PUT), decide we should do nothing (based on the existing +        record), or create a new one. + +        Implementations should update exists/updated counts appropriately. + +        Returns boolean: True if  +        """ +        raise NotImplementedError + +    def insert_batch(self, raw_record): +        raise NotImplementedError + + +class RecordPusher: +    """ +    Base class for different importer sources. Pretty trivial interface, just +    wraps an importer and pushes records in to it. +    """ + +    def __init__(self, importer, **kwargs): + +        eg_extra = kwargs.get('editgroup_extra', dict()) +        eg_extra['git_rev'] = eg_extra.get('git_rev', +            subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') +        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityImporter') +         +        self.api = api +        self.bezerk_mode = kwargs.get('bezerk_mode', False) +        self._editgroup_description = kwargs.get('editgroup_description') + +    def run(self): +        """ +        This will look something like: + +            for line in sys.stdin: +                record = json.loads(line) +                self.importer.push_record(record) +            print(self.importer.finish()) +        """ +        raise NotImplementedError + +  # from: https://docs.python.org/3/library/itertools.html  def grouper(iterable, n, fillvalue=None):      "Collect data into fixed-length chunks or blocks" @@ -179,3 +344,4 @@ class FatcatImporter:          if issn is None:              return None          return self._issn_issnl_map.get(issn) + | 
