summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/importers
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/importers')
-rw-r--r--python/fatcat_tools/importers/common.py30
-rw-r--r--python/fatcat_tools/importers/grobid_metadata.py50
2 files changed, 43 insertions, 37 deletions
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index b6b20b4c..2d5c89b3 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -34,7 +34,7 @@ class EntityImporter:
self.create_<entity>(entity) -> EntityEdit
for related entity types
self.push_entity(entity)
- self.counts['exits'] += 1
+ self.counts['exists'] += 1
if didn't update or insert because of existing)
self.counts['update'] += 1
if updated an entity
@@ -52,16 +52,19 @@ class EntityImporter:
self.edit_batch_size = kwargs.get('edit_batch_size', 100)
self.editgroup_description = kwargs.get('editgroup_description')
self.editgroup_extra = kwargs.get('editgroup_extra')
- self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0})
- self._edit_count = 0
- self._editgroup_id = None
- self._entity_queue = []
+ self.reset()
self._issnl_id_map = dict()
self._orcid_id_map = dict()
self._orcid_regex = re.compile("^\\d{4}-\\d{4}-\\d{4}-\\d{3}[\\dX]$")
self._doi_id_map = dict()
+ def reset(self):
+ self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0})
+ self._edit_count = 0
+ self._editgroup_id = None
+ self._entity_queue = []
+
def push_record(self, raw_record):
"""
Returns nothing.
@@ -278,7 +281,9 @@ class JsonLinePusher(RecordPusher):
continue
record = json.loads(line)
self.importer.push_record(record)
- print(self.importer.finish())
+ counts = self.importer.finish()
+ print(counts)
+ return counts
class CsvPusher(RecordPusher):
@@ -292,7 +297,9 @@ class CsvPusher(RecordPusher):
if not line:
continue
self.importer.push_record(line)
- print(self.importer.finish())
+ counts = self.importer.finish()
+ print(counts)
+ return counts
class LinePusher(RecordPusher):
@@ -306,7 +313,9 @@ class LinePusher(RecordPusher):
if not line:
continue
self.importer.push_record(line)
- print(self.importer.finish())
+ counts = self.importer.finish()
+ print(counts)
+ return counts
class KafkaJsonPusher(RecordPusher):
@@ -332,7 +341,9 @@ class KafkaJsonPusher(RecordPusher):
print("Import counts: {}".format(self.importer.counts))
# TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or
# commit the current batch if it has been lingering
- print(self.importer.finish())
+ counts = self.importer.finish()
+ print(counts)
+ return counts
def make_kafka_consumer(hosts, env, topic_suffix, group):
@@ -349,4 +360,3 @@ def make_kafka_consumer(hosts, env, topic_suffix, group):
compacted_topic=True,
)
return consumer
-
diff --git a/python/fatcat_tools/importers/grobid_metadata.py b/python/fatcat_tools/importers/grobid_metadata.py
index c1835b9f..4d3b41bc 100644
--- a/python/fatcat_tools/importers/grobid_metadata.py
+++ b/python/fatcat_tools/importers/grobid_metadata.py
@@ -34,51 +34,47 @@ class GrobidMetadataImporter(EntityImporter):
self.default_link_rel = kwargs.get("default_link_rel", "web")
def want(self, raw_record):
+ return True
+
+ def parse_record(self, row):
- fields = raw_record.split('\t')
+ fields = row.split('\t')
sha1_key = fields[0]
- sha1 = base64.b16encode(base64.b32decode(sha1_key.replace('sha1:', ''))).decode('ascii').lower()
- #cdx = json.loads(fields[1])
- #mimetype = fields[2]
- #file_size = int(fields[3])
+ cdx = json.loads(fields[1])
+ mimetype = fields[2]
+ file_size = int(fields[3])
grobid_meta = json.loads(fields[4])
+ fe = self.parse_file_metadata(sha1_key, cdx, mimetype, file_size)
+ re = self.parse_grobid_json(grobid_meta)
- if not grobid_meta.get('title'):
- return False
+ if not (fe and re):
+ return None
# lookup existing file SHA1
+ existing = None
try:
- existing_file = self.api.lookup_file(sha1=sha1)
+ existing = self.api.lookup_file(sha1=fe.sha1)
except fatcat_client.rest.ApiException as err:
if err.status != 404:
raise err
- existing_file = None
# if file is already in here, presumably not actually long-tail
+ # HACK: this is doing an exists check in parse_record(), which is weird
# TODO: this is where we should check if the file actually has
# release_ids and/or URLs associated with it
- if existing_file and not self.bezerk_mode:
- return False
- return True
-
- def parse_record(self, row):
-
- fields = row.split('\t')
- sha1_key = fields[0]
- cdx = json.loads(fields[1])
- mimetype = fields[2]
- file_size = int(fields[3])
- grobid_meta = json.loads(fields[4])
- fe = self.parse_file_metadata(sha1_key, cdx, mimetype, file_size)
- re = self.parse_grobid_json(grobid_meta)
- assert (fe and re)
+ if existing and not self.bezerk_mode:
+ self.counts['exists'] += 1
+ self.counts['skip'] -= 1
+ return None
release_edit = self.create_release(re)
fe.release_ids.append(release_edit.ident)
return fe
def parse_grobid_json(self, obj):
- assert obj.get('title')
+
+ if not obj.get('title'):
+ return None
extra = dict()
@@ -196,8 +192,8 @@ class GrobidMetadataImporter(EntityImporter):
return fe
- def try_update(entity):
- # we did this in want()
+ def try_update(self, entity):
+ # did the exists check in 'parse_record()', because we needed to create a release
return True
def insert_batch(self, batch):