summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/importers/common.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2019-01-23 15:02:03 -0800
committerBryan Newbold <bnewbold@robocracy.org>2019-01-23 15:02:03 -0800
commit1443f05faebd9e697086132694401f6a6c42d9b5 (patch)
tree8da8b8e7f4c957c5edccefe9188741c15697cd46 /python/fatcat_tools/importers/common.py
parent1fa8f820fd3b7c64d424f55796d2b860d22e4b22 (diff)
downloadfatcat-1443f05faebd9e697086132694401f6a6c42d9b5.tar.gz
fatcat-1443f05faebd9e697086132694401f6a6c42d9b5.zip
more tests; fix some importer behavior
Diffstat (limited to 'python/fatcat_tools/importers/common.py')
-rw-r--r--python/fatcat_tools/importers/common.py30
1 files changed, 20 insertions, 10 deletions
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index b6b20b4c..2d5c89b3 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -34,7 +34,7 @@ class EntityImporter:
self.create_<entity>(entity) -> EntityEdit
for related entity types
self.push_entity(entity)
- self.counts['exits'] += 1
+ self.counts['exists'] += 1
if didn't update or insert because of existing)
self.counts['update'] += 1
if updated an entity
@@ -52,16 +52,19 @@ class EntityImporter:
self.edit_batch_size = kwargs.get('edit_batch_size', 100)
self.editgroup_description = kwargs.get('editgroup_description')
self.editgroup_extra = kwargs.get('editgroup_extra')
- self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0})
- self._edit_count = 0
- self._editgroup_id = None
- self._entity_queue = []
+ self.reset()
self._issnl_id_map = dict()
self._orcid_id_map = dict()
self._orcid_regex = re.compile("^\\d{4}-\\d{4}-\\d{4}-\\d{3}[\\dX]$")
self._doi_id_map = dict()
+ def reset(self):
+ self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0})
+ self._edit_count = 0
+ self._editgroup_id = None
+ self._entity_queue = []
+
def push_record(self, raw_record):
"""
Returns nothing.
@@ -278,7 +281,9 @@ class JsonLinePusher(RecordPusher):
continue
record = json.loads(line)
self.importer.push_record(record)
- print(self.importer.finish())
+ counts = self.importer.finish()
+ print(counts)
+ return counts
class CsvPusher(RecordPusher):
@@ -292,7 +297,9 @@ class CsvPusher(RecordPusher):
if not line:
continue
self.importer.push_record(line)
- print(self.importer.finish())
+ counts = self.importer.finish()
+ print(counts)
+ return counts
class LinePusher(RecordPusher):
@@ -306,7 +313,9 @@ class LinePusher(RecordPusher):
if not line:
continue
self.importer.push_record(line)
- print(self.importer.finish())
+ counts = self.importer.finish()
+ print(counts)
+ return counts
class KafkaJsonPusher(RecordPusher):
@@ -332,7 +341,9 @@ class KafkaJsonPusher(RecordPusher):
print("Import counts: {}".format(self.importer.counts))
# TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or
# commit the current batch if it has been lingering
- print(self.importer.finish())
+ counts = self.importer.finish()
+ print(counts)
+ return counts
def make_kafka_consumer(hosts, env, topic_suffix, group):
@@ -349,4 +360,3 @@ def make_kafka_consumer(hosts, env, topic_suffix, group):
compacted_topic=True,
)
return consumer
-