diff options
Diffstat (limited to 'python/fatcat_tools/importers/common.py')
-rw-r--r-- | python/fatcat_tools/importers/common.py | 30 |
1 files changed, 20 insertions, 10 deletions
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index b6b20b4c..2d5c89b3 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -34,7 +34,7 @@ class EntityImporter: self.create_<entity>(entity) -> EntityEdit for related entity types self.push_entity(entity) - self.counts['exits'] += 1 + self.counts['exists'] += 1 if didn't update or insert because of existing) self.counts['update'] += 1 if updated an entity @@ -52,16 +52,19 @@ class EntityImporter: self.edit_batch_size = kwargs.get('edit_batch_size', 100) self.editgroup_description = kwargs.get('editgroup_description') self.editgroup_extra = kwargs.get('editgroup_extra') - self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0}) - self._edit_count = 0 - self._editgroup_id = None - self._entity_queue = [] + self.reset() self._issnl_id_map = dict() self._orcid_id_map = dict() self._orcid_regex = re.compile("^\\d{4}-\\d{4}-\\d{4}-\\d{3}[\\dX]$") self._doi_id_map = dict() + def reset(self): + self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0}) + self._edit_count = 0 + self._editgroup_id = None + self._entity_queue = [] + def push_record(self, raw_record): """ Returns nothing. @@ -278,7 +281,9 @@ class JsonLinePusher(RecordPusher): continue record = json.loads(line) self.importer.push_record(record) - print(self.importer.finish()) + counts = self.importer.finish() + print(counts) + return counts class CsvPusher(RecordPusher): @@ -292,7 +297,9 @@ class CsvPusher(RecordPusher): if not line: continue self.importer.push_record(line) - print(self.importer.finish()) + counts = self.importer.finish() + print(counts) + return counts class LinePusher(RecordPusher): @@ -306,7 +313,9 @@ class LinePusher(RecordPusher): if not line: continue self.importer.push_record(line) - print(self.importer.finish()) + counts = self.importer.finish() + print(counts) + return counts class KafkaJsonPusher(RecordPusher): @@ -332,7 +341,9 @@ class KafkaJsonPusher(RecordPusher): print("Import counts: {}".format(self.importer.counts)) # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or # commit the current batch if it has been lingering - print(self.importer.finish()) + counts = self.importer.finish() + print(counts) + return counts def make_kafka_consumer(hosts, env, topic_suffix, group): @@ -349,4 +360,3 @@ def make_kafka_consumer(hosts, env, topic_suffix, group): compacted_topic=True, ) return consumer - |