summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/importers
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/importers')
-rw-r--r--python/fatcat_tools/importers/__init__.py15
-rw-r--r--python/fatcat_tools/importers/common.py166
2 files changed, 181 insertions, 0 deletions
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py
index 47fc1fd3..f2caca5c 100644
--- a/python/fatcat_tools/importers/__init__.py
+++ b/python/fatcat_tools/importers/__init__.py
@@ -1,7 +1,22 @@
+"""
+To run an import you combine two classes; one each of:
+
+- RecordSource: somehow iterates over a source of raw records (eg, from a
+ database, Kafka, files on disk, stdin) and pushes into an entity importer.
+- EntityImporter: class that a record iterator pushes raw (unparsed) records
+ into. The entity importer parses and decides what to do (ignore, update,
+ insert, etc). There is usually a primary entity type, though related entities
+ can be created along the way. Maintains API connection and editgroup/batch
+ state.
+
+"""
+
from .common import FatcatImporter, make_kafka_consumer
from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP
from .grobid_metadata import GrobidMetadataImporter
from .journal_metadata import JournalMetadataImporter
from .matched import MatchedImporter
from .orcid import OrcidImporter
+from .kafka_source import KafkaSource
+from .file_source import FileSource
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index 06897bee..25ee4727 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -12,6 +12,171 @@ import fatcat_client
from fatcat_client.rest import ApiException
+class EntityImporter:
+ """
+ Base class for fatcat entity importers.
+
+ The API exposed to record iterator is:
+
+ push_record(raw_record)
+ finish()
+
+ The API that implementations are expected to fill in are:
+
+ want(raw_record) -> boolean
+ parse(raw_record) -> entity
+ try_update(entity) -> boolean
+ insert_batch([entity]) -> None
+
+ This class exposes helpers for implementations:
+
+ self.api
+ self.create_related_*(entity) for all entity types
+ self.push_entity(entity)
+ self.counts['exits'] += 1 (if didn't update or insert because of existing)
+ self.counts['update'] += 1 (if updated an entity)
+ """
+
+ def __init__(self, api, **kwargs):
+
+ eg_extra = kwargs.get('editgroup_extra', dict())
+ eg_extra['git_rev'] = eg_extra.get('git_rev',
+ subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8')
+ eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityImporter')
+
+ self.api = api
+ self.bezerk_mode = kwargs.get('bezerk_mode', False)
+ self._editgroup_description = kwargs.get('editgroup_description')
+ self._editgroup_extra = kwargs.get('editgroup_extra')
+ self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0, 'sub.container': 0})
+ self._edit_count = 0
+ self._editgroup_id = None
+ self._entity_queue = []
+
+ def push_record(self, raw_record):
+ """
+ Returns nothing.
+ """
+ if (not raw_record) or (not self.want(raw_record):
+ self.counts['skip'] += 1
+ return
+ entity = self.parse_record(raw_record)
+ if self.bezerk_mode:
+ self.push_entity(entity)
+ return
+ if self.try_update(entity):
+ self.push_entity(entity)
+ return
+
+ def finish(self, raw_record):
+ if self._edit_count > 0:
+ self.api.accept_editgroup(self._editgroup_id)
+ self._editgroup_id = None
+ self._edit_count = 0
+
+ if self._entity_queue:
+ self.insert_batch(self._entity_queue)
+ self.counts['insert'] += len(_entity_queue)
+ self._entity_queue = 0
+
+ self.counts['total'] = counts['skip'] + counts['insert'] + \
+ counts['update'] + counts['exists']
+ return self.counts
+
+ def _get_editgroup(self, edits=1):
+ if self._edit_count >= self.edit_batch_size:
+ self.api.accept_editgroup(self._editgroup_id)
+ self._editgroup_id = None
+ self._edit_count = 0
+
+ if not self._editgroup_id:
+ eg = self.api.create_editgroup(
+ description=self._editgroup_description,
+ extra=self._editgroup_extra)
+ self._editgroup_id = eg.editgroup_id
+
+ self._edit_count += edits
+ return self._editgroup_id
+
+ def create_container(self, entity):
+ eg = self._get_editgroup()
+ self.api.create_container(entity, editgroup_id=eg.editgroup_id)
+ self.counts['sub.container'] += 1
+
+ def updated(self):
+ """
+ Implementations should call this from try_update() if the update was successful
+ """
+ self.counts['update'] += 1
+
+ def push_entity(self, entity):
+ self._entity_queue.append(entity)
+ if len(self._entity_queue) >= self.edit_batch_size:
+ self.insert_batch(self._entity_queue)
+ self.counts['insert'] += len(_entity_queue)
+ self._entity_queue = 0
+
+ def want(self, raw_record):
+ """
+ Implementations can override for optional fast-path to drop a record.
+ Must have no side-effects; returns bool.
+ """
+ return True
+
+ def parse(self, raw_record):
+ """
+ Returns an entity class type. expected to always succeed, or raise an
+ exception (`want()` should be used to filter out bad records). May have
+ side-effects (eg, create related entities), but shouldn't update/mutate
+ the actual entity.
+ """
+ raise NotImplementedError
+
+ def try_update(self, raw_record):
+ """
+ Passed the output of parse(). Should try to find an existing entity and
+ update it (PUT), decide we should do nothing (based on the existing
+ record), or create a new one.
+
+ Implementations should update exists/updated counts appropriately.
+
+ Returns boolean: True if
+ """
+ raise NotImplementedError
+
+ def insert_batch(self, raw_record):
+ raise NotImplementedError
+
+
+class RecordPusher:
+ """
+ Base class for different importer sources. Pretty trivial interface, just
+ wraps an importer and pushes records in to it.
+ """
+
+ def __init__(self, importer, **kwargs):
+
+ eg_extra = kwargs.get('editgroup_extra', dict())
+ eg_extra['git_rev'] = eg_extra.get('git_rev',
+ subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8')
+ eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityImporter')
+
+ self.api = api
+ self.bezerk_mode = kwargs.get('bezerk_mode', False)
+ self._editgroup_description = kwargs.get('editgroup_description')
+
+ def run(self):
+ """
+ This will look something like:
+
+ for line in sys.stdin:
+ record = json.loads(line)
+ self.importer.push_record(record)
+ print(self.importer.finish())
+ """
+ raise NotImplementedError
+
+
# from: https://docs.python.org/3/library/itertools.html
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
@@ -179,3 +344,4 @@ class FatcatImporter:
if issn is None:
return None
return self._issn_issnl_map.get(issn)
+