diff options
-rw-r--r-- | python/fatcat_tools/importers/__init__.py | 15 | ||||
-rw-r--r-- | python/fatcat_tools/importers/common.py | 166 |
2 files changed, 181 insertions, 0 deletions
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 47fc1fd3..f2caca5c 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -1,7 +1,22 @@ +""" +To run an import you combine two classes; one each of: + +- RecordSource: somehow iterates over a source of raw records (eg, from a + database, Kafka, files on disk, stdin) and pushes into an entity importer. +- EntityImporter: class that a record iterator pushes raw (unparsed) records + into. The entity importer parses and decides what to do (ignore, update, + insert, etc). There is usually a primary entity type, though related entities + can be created along the way. Maintains API connection and editgroup/batch + state. + +""" + from .common import FatcatImporter, make_kafka_consumer from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP from .grobid_metadata import GrobidMetadataImporter from .journal_metadata import JournalMetadataImporter from .matched import MatchedImporter from .orcid import OrcidImporter +from .kafka_source import KafkaSource +from .file_source import FileSource diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 06897bee..25ee4727 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -12,6 +12,171 @@ import fatcat_client from fatcat_client.rest import ApiException +class EntityImporter: + """ + Base class for fatcat entity importers. + + The API exposed to record iterator is: + + push_record(raw_record) + finish() + + The API that implementations are expected to fill in are: + + want(raw_record) -> boolean + parse(raw_record) -> entity + try_update(entity) -> boolean + insert_batch([entity]) -> None + + This class exposes helpers for implementations: + + self.api + self.create_related_*(entity) for all entity types + self.push_entity(entity) + self.counts['exits'] += 1 (if didn't update or insert because of existing) + self.counts['update'] += 1 (if updated an entity) + """ + + def __init__(self, api, **kwargs): + + eg_extra = kwargs.get('editgroup_extra', dict()) + eg_extra['git_rev'] = eg_extra.get('git_rev', + subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityImporter') + + self.api = api + self.bezerk_mode = kwargs.get('bezerk_mode', False) + self._editgroup_description = kwargs.get('editgroup_description') + self._editgroup_extra = kwargs.get('editgroup_extra') + self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0, 'sub.container': 0}) + self._edit_count = 0 + self._editgroup_id = None + self._entity_queue = [] + + def push_record(self, raw_record): + """ + Returns nothing. + """ + if (not raw_record) or (not self.want(raw_record): + self.counts['skip'] += 1 + return + entity = self.parse_record(raw_record) + if self.bezerk_mode: + self.push_entity(entity) + return + if self.try_update(entity): + self.push_entity(entity) + return + + def finish(self, raw_record): + if self._edit_count > 0: + self.api.accept_editgroup(self._editgroup_id) + self._editgroup_id = None + self._edit_count = 0 + + if self._entity_queue: + self.insert_batch(self._entity_queue) + self.counts['insert'] += len(_entity_queue) + self._entity_queue = 0 + + self.counts['total'] = counts['skip'] + counts['insert'] + \ + counts['update'] + counts['exists'] + return self.counts + + def _get_editgroup(self, edits=1): + if self._edit_count >= self.edit_batch_size: + self.api.accept_editgroup(self._editgroup_id) + self._editgroup_id = None + self._edit_count = 0 + + if not self._editgroup_id: + eg = self.api.create_editgroup( + description=self._editgroup_description, + extra=self._editgroup_extra) + self._editgroup_id = eg.editgroup_id + + self._edit_count += edits + return self._editgroup_id + + def create_container(self, entity): + eg = self._get_editgroup() + self.api.create_container(entity, editgroup_id=eg.editgroup_id) + self.counts['sub.container'] += 1 + + def updated(self): + """ + Implementations should call this from try_update() if the update was successful + """ + self.counts['update'] += 1 + + def push_entity(self, entity): + self._entity_queue.append(entity) + if len(self._entity_queue) >= self.edit_batch_size: + self.insert_batch(self._entity_queue) + self.counts['insert'] += len(_entity_queue) + self._entity_queue = 0 + + def want(self, raw_record): + """ + Implementations can override for optional fast-path to drop a record. + Must have no side-effects; returns bool. + """ + return True + + def parse(self, raw_record): + """ + Returns an entity class type. expected to always succeed, or raise an + exception (`want()` should be used to filter out bad records). May have + side-effects (eg, create related entities), but shouldn't update/mutate + the actual entity. + """ + raise NotImplementedError + + def try_update(self, raw_record): + """ + Passed the output of parse(). Should try to find an existing entity and + update it (PUT), decide we should do nothing (based on the existing + record), or create a new one. + + Implementations should update exists/updated counts appropriately. + + Returns boolean: True if + """ + raise NotImplementedError + + def insert_batch(self, raw_record): + raise NotImplementedError + + +class RecordPusher: + """ + Base class for different importer sources. Pretty trivial interface, just + wraps an importer and pushes records in to it. + """ + + def __init__(self, importer, **kwargs): + + eg_extra = kwargs.get('editgroup_extra', dict()) + eg_extra['git_rev'] = eg_extra.get('git_rev', + subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityImporter') + + self.api = api + self.bezerk_mode = kwargs.get('bezerk_mode', False) + self._editgroup_description = kwargs.get('editgroup_description') + + def run(self): + """ + This will look something like: + + for line in sys.stdin: + record = json.loads(line) + self.importer.push_record(record) + print(self.importer.finish()) + """ + raise NotImplementedError + + # from: https://docs.python.org/3/library/itertools.html def grouper(iterable, n, fillvalue=None): "Collect data into fixed-length chunks or blocks" @@ -179,3 +344,4 @@ class FatcatImporter: if issn is None: return None return self._issn_issnl_map.get(issn) + |