diff options
Diffstat (limited to 'python/fatcat_tools/importers')
-rw-r--r-- | python/fatcat_tools/importers/common.py | 30 | ||||
-rw-r--r-- | python/fatcat_tools/importers/grobid_metadata.py | 50 |
2 files changed, 43 insertions, 37 deletions
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index b6b20b4c..2d5c89b3 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -34,7 +34,7 @@ class EntityImporter: self.create_<entity>(entity) -> EntityEdit for related entity types self.push_entity(entity) - self.counts['exits'] += 1 + self.counts['exists'] += 1 if didn't update or insert because of existing) self.counts['update'] += 1 if updated an entity @@ -52,16 +52,19 @@ class EntityImporter: self.edit_batch_size = kwargs.get('edit_batch_size', 100) self.editgroup_description = kwargs.get('editgroup_description') self.editgroup_extra = kwargs.get('editgroup_extra') - self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0}) - self._edit_count = 0 - self._editgroup_id = None - self._entity_queue = [] + self.reset() self._issnl_id_map = dict() self._orcid_id_map = dict() self._orcid_regex = re.compile("^\\d{4}-\\d{4}-\\d{4}-\\d{3}[\\dX]$") self._doi_id_map = dict() + def reset(self): + self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0}) + self._edit_count = 0 + self._editgroup_id = None + self._entity_queue = [] + def push_record(self, raw_record): """ Returns nothing. @@ -278,7 +281,9 @@ class JsonLinePusher(RecordPusher): continue record = json.loads(line) self.importer.push_record(record) - print(self.importer.finish()) + counts = self.importer.finish() + print(counts) + return counts class CsvPusher(RecordPusher): @@ -292,7 +297,9 @@ class CsvPusher(RecordPusher): if not line: continue self.importer.push_record(line) - print(self.importer.finish()) + counts = self.importer.finish() + print(counts) + return counts class LinePusher(RecordPusher): @@ -306,7 +313,9 @@ class LinePusher(RecordPusher): if not line: continue self.importer.push_record(line) - print(self.importer.finish()) + counts = self.importer.finish() + print(counts) + return counts class KafkaJsonPusher(RecordPusher): @@ -332,7 +341,9 @@ class KafkaJsonPusher(RecordPusher): print("Import counts: {}".format(self.importer.counts)) # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or # commit the current batch if it has been lingering - print(self.importer.finish()) + counts = self.importer.finish() + print(counts) + return counts def make_kafka_consumer(hosts, env, topic_suffix, group): @@ -349,4 +360,3 @@ def make_kafka_consumer(hosts, env, topic_suffix, group): compacted_topic=True, ) return consumer - diff --git a/python/fatcat_tools/importers/grobid_metadata.py b/python/fatcat_tools/importers/grobid_metadata.py index c1835b9f..4d3b41bc 100644 --- a/python/fatcat_tools/importers/grobid_metadata.py +++ b/python/fatcat_tools/importers/grobid_metadata.py @@ -34,51 +34,47 @@ class GrobidMetadataImporter(EntityImporter): self.default_link_rel = kwargs.get("default_link_rel", "web") def want(self, raw_record): + return True + + def parse_record(self, row): - fields = raw_record.split('\t') + fields = row.split('\t') sha1_key = fields[0] - sha1 = base64.b16encode(base64.b32decode(sha1_key.replace('sha1:', ''))).decode('ascii').lower() - #cdx = json.loads(fields[1]) - #mimetype = fields[2] - #file_size = int(fields[3]) + cdx = json.loads(fields[1]) + mimetype = fields[2] + file_size = int(fields[3]) grobid_meta = json.loads(fields[4]) + fe = self.parse_file_metadata(sha1_key, cdx, mimetype, file_size) + re = self.parse_grobid_json(grobid_meta) - if not grobid_meta.get('title'): - return False + if not (fe and re): + return None # lookup existing file SHA1 + existing = None try: - existing_file = self.api.lookup_file(sha1=sha1) + existing = self.api.lookup_file(sha1=fe.sha1) except fatcat_client.rest.ApiException as err: if err.status != 404: raise err - existing_file = None # if file is already in here, presumably not actually long-tail + # HACK: this is doing an exists check in parse_record(), which is weird # TODO: this is where we should check if the file actually has # release_ids and/or URLs associated with it - if existing_file and not self.bezerk_mode: - return False - return True - - def parse_record(self, row): - - fields = row.split('\t') - sha1_key = fields[0] - cdx = json.loads(fields[1]) - mimetype = fields[2] - file_size = int(fields[3]) - grobid_meta = json.loads(fields[4]) - fe = self.parse_file_metadata(sha1_key, cdx, mimetype, file_size) - re = self.parse_grobid_json(grobid_meta) - assert (fe and re) + if existing and not self.bezerk_mode: + self.counts['exists'] += 1 + self.counts['skip'] -= 1 + return None release_edit = self.create_release(re) fe.release_ids.append(release_edit.ident) return fe def parse_grobid_json(self, obj): - assert obj.get('title') + + if not obj.get('title'): + return None extra = dict() @@ -196,8 +192,8 @@ class GrobidMetadataImporter(EntityImporter): return fe - def try_update(entity): - # we did this in want() + def try_update(self, entity): + # did the exists check in 'parse_record()', because we needed to create a release return True def insert_batch(self, batch): |