diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/fatcat_tools/importers/common.py | 30 | ||||
-rw-r--r-- | python/fatcat_tools/importers/grobid_metadata.py | 50 | ||||
-rw-r--r-- | python/tests/import_crossref.py | 17 | ||||
-rw-r--r-- | python/tests/import_grobid_metadata.py | 16 | ||||
-rw-r--r-- | python/tests/import_journal_metadata.py | 13 | ||||
-rw-r--r-- | python/tests/import_matched.py | 16 | ||||
-rw-r--r-- | python/tests/import_orcid.py | 19 |
7 files changed, 111 insertions, 50 deletions
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index b6b20b4c..2d5c89b3 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -34,7 +34,7 @@ class EntityImporter: self.create_<entity>(entity) -> EntityEdit for related entity types self.push_entity(entity) - self.counts['exits'] += 1 + self.counts['exists'] += 1 if didn't update or insert because of existing) self.counts['update'] += 1 if updated an entity @@ -52,16 +52,19 @@ class EntityImporter: self.edit_batch_size = kwargs.get('edit_batch_size', 100) self.editgroup_description = kwargs.get('editgroup_description') self.editgroup_extra = kwargs.get('editgroup_extra') - self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0}) - self._edit_count = 0 - self._editgroup_id = None - self._entity_queue = [] + self.reset() self._issnl_id_map = dict() self._orcid_id_map = dict() self._orcid_regex = re.compile("^\\d{4}-\\d{4}-\\d{4}-\\d{3}[\\dX]$") self._doi_id_map = dict() + def reset(self): + self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0}) + self._edit_count = 0 + self._editgroup_id = None + self._entity_queue = [] + def push_record(self, raw_record): """ Returns nothing. @@ -278,7 +281,9 @@ class JsonLinePusher(RecordPusher): continue record = json.loads(line) self.importer.push_record(record) - print(self.importer.finish()) + counts = self.importer.finish() + print(counts) + return counts class CsvPusher(RecordPusher): @@ -292,7 +297,9 @@ class CsvPusher(RecordPusher): if not line: continue self.importer.push_record(line) - print(self.importer.finish()) + counts = self.importer.finish() + print(counts) + return counts class LinePusher(RecordPusher): @@ -306,7 +313,9 @@ class LinePusher(RecordPusher): if not line: continue self.importer.push_record(line) - print(self.importer.finish()) + counts = self.importer.finish() + print(counts) + return counts class KafkaJsonPusher(RecordPusher): @@ -332,7 +341,9 @@ class KafkaJsonPusher(RecordPusher): print("Import counts: {}".format(self.importer.counts)) # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or # commit the current batch if it has been lingering - print(self.importer.finish()) + counts = self.importer.finish() + print(counts) + return counts def make_kafka_consumer(hosts, env, topic_suffix, group): @@ -349,4 +360,3 @@ def make_kafka_consumer(hosts, env, topic_suffix, group): compacted_topic=True, ) return consumer - diff --git a/python/fatcat_tools/importers/grobid_metadata.py b/python/fatcat_tools/importers/grobid_metadata.py index c1835b9f..4d3b41bc 100644 --- a/python/fatcat_tools/importers/grobid_metadata.py +++ b/python/fatcat_tools/importers/grobid_metadata.py @@ -34,51 +34,47 @@ class GrobidMetadataImporter(EntityImporter): self.default_link_rel = kwargs.get("default_link_rel", "web") def want(self, raw_record): + return True + + def parse_record(self, row): - fields = raw_record.split('\t') + fields = row.split('\t') sha1_key = fields[0] - sha1 = base64.b16encode(base64.b32decode(sha1_key.replace('sha1:', ''))).decode('ascii').lower() - #cdx = json.loads(fields[1]) - #mimetype = fields[2] - #file_size = int(fields[3]) + cdx = json.loads(fields[1]) + mimetype = fields[2] + file_size = int(fields[3]) grobid_meta = json.loads(fields[4]) + fe = self.parse_file_metadata(sha1_key, cdx, mimetype, file_size) + re = self.parse_grobid_json(grobid_meta) - if not grobid_meta.get('title'): - return False + if not (fe and re): + return None # lookup existing file SHA1 + existing = None try: - existing_file = self.api.lookup_file(sha1=sha1) + existing = self.api.lookup_file(sha1=fe.sha1) except fatcat_client.rest.ApiException as err: if err.status != 404: raise err - existing_file = None # if file is already in here, presumably not actually long-tail + # HACK: this is doing an exists check in parse_record(), which is weird # TODO: this is where we should check if the file actually has # release_ids and/or URLs associated with it - if existing_file and not self.bezerk_mode: - return False - return True - - def parse_record(self, row): - - fields = row.split('\t') - sha1_key = fields[0] - cdx = json.loads(fields[1]) - mimetype = fields[2] - file_size = int(fields[3]) - grobid_meta = json.loads(fields[4]) - fe = self.parse_file_metadata(sha1_key, cdx, mimetype, file_size) - re = self.parse_grobid_json(grobid_meta) - assert (fe and re) + if existing and not self.bezerk_mode: + self.counts['exists'] += 1 + self.counts['skip'] -= 1 + return None release_edit = self.create_release(re) fe.release_ids.append(release_edit.ident) return fe def parse_grobid_json(self, obj): - assert obj.get('title') + + if not obj.get('title'): + return None extra = dict() @@ -196,8 +192,8 @@ class GrobidMetadataImporter(EntityImporter): return fe - def try_update(entity): - # we did this in want() + def try_update(self, entity): + # did the exists check in 'parse_record()', because we needed to create a release return True def insert_batch(self, batch): diff --git a/python/tests/import_crossref.py b/python/tests/import_crossref.py index 8eeb8072..45123540 100644 --- a/python/tests/import_crossref.py +++ b/python/tests/import_crossref.py @@ -23,7 +23,11 @@ def test_crossref_importer(crossref_importer): last_index = crossref_importer.api.get_changelog(limit=1)[0].index with open('tests/files/crossref-works.2018-01-21.badsample.json', 'r') as f: crossref_importer.bezerk_mode = True - JsonLinePusher(crossref_importer, f).run() + counts = JsonLinePusher(crossref_importer, f).run() + assert counts['insert'] == 14 + assert counts['exists'] == 0 + assert counts['skip'] == 0 + # fetch most recent editgroup change = crossref_importer.api.get_changelog_entry(index=last_index+1) eg = change.editgroup @@ -32,6 +36,14 @@ def test_crossref_importer(crossref_importer): assert eg.extra['git_rev'] assert "fatcat_tools.CrossrefImporter" in eg.extra['agent'] + with open('tests/files/crossref-works.2018-01-21.badsample.json', 'r') as f: + crossref_importer.bezerk_mode = False + crossref_importer.reset() + counts = JsonLinePusher(crossref_importer, f).run() + assert counts['insert'] == 0 + assert counts['exists'] == 14 + assert counts['skip'] == 0 + def test_crossref_mappings(crossref_importer): assert crossref_importer.map_release_type('journal-article') == "article-journal" assert crossref_importer.map_release_type('asdf') is None @@ -41,8 +53,7 @@ def test_crossref_mappings(crossref_importer): def test_crossref_importer_create(crossref_importer): crossref_importer.create_containers = True with open('tests/files/crossref-works.2018-01-21.badsample.json', 'r') as f: - pusher = JsonLinePusher(crossref_importer, f) - pusher.run() + JsonLinePusher(crossref_importer, f).run() def test_crossref_dict_parse(crossref_importer): with open('tests/files/crossref-works.single.json', 'r') as f: diff --git a/python/tests/import_grobid_metadata.py b/python/tests/import_grobid_metadata.py index 698b36be..feb604ce 100644 --- a/python/tests/import_grobid_metadata.py +++ b/python/tests/import_grobid_metadata.py @@ -48,12 +48,15 @@ def test_file_metadata_parse(grobid_metadata_importer): assert fe.urls[0].rel == "webarchive" assert len(fe.release_ids) == 0 -# TODO: use API to check that entities actually created... def test_grobid_metadata_importer(grobid_metadata_importer): last_index = grobid_metadata_importer.api.get_changelog(limit=1)[0].index with open('tests/files/example_grobid_metadata_lines.tsv', 'r') as f: grobid_metadata_importer.bezerk_mode = True - LinePusher(grobid_metadata_importer, f).run() + counts = LinePusher(grobid_metadata_importer, f).run() + assert counts['insert'] == 10 + assert counts['inserted.release'] == 10 + assert counts['exists'] == 0 + assert counts['skip'] == 0 # fetch most recent editgroup change = grobid_metadata_importer.api.get_changelog_entry(index=last_index+1) @@ -62,3 +65,12 @@ def test_grobid_metadata_importer(grobid_metadata_importer): assert "grobid" in eg.description.lower() assert eg.extra['git_rev'] assert "fatcat_tools.GrobidMetadataImporter" in eg.extra['agent'] + + with open('tests/files/example_grobid_metadata_lines.tsv', 'r') as f: + grobid_metadata_importer.reset() + grobid_metadata_importer.bezerk_mode = False + counts = LinePusher(grobid_metadata_importer, f).run() + assert counts['insert'] == 0 + assert counts['inserted.release'] == 0 + assert counts['exists'] == 10 + assert counts['skip'] == 0 diff --git a/python/tests/import_journal_metadata.py b/python/tests/import_journal_metadata.py index 1663da05..a2b10a65 100644 --- a/python/tests/import_journal_metadata.py +++ b/python/tests/import_journal_metadata.py @@ -17,7 +17,10 @@ def test_journal_metadata_importer(journal_metadata_importer): last_index = journal_metadata_importer.api.get_changelog(limit=1)[0].index with open('tests/files/journal_extra_metadata.snip.csv', 'r') as f: journal_metadata_importer.bezerk_mode = True - CsvPusher(journal_metadata_importer, f).run() + counts = CsvPusher(journal_metadata_importer, f).run() + assert counts['insert'] == 9 + assert counts['exists'] == 0 + assert counts['skip'] == 0 # fetch most recent editgroup change = journal_metadata_importer.api.get_changelog_entry(index=last_index+1) @@ -26,3 +29,11 @@ def test_journal_metadata_importer(journal_metadata_importer): assert "container" in eg.description.lower() assert eg.extra['git_rev'] assert "fatcat_tools.JournalMetadataImporter" in eg.extra['agent'] + + with open('tests/files/journal_extra_metadata.snip.csv', 'r') as f: + journal_metadata_importer.reset() + journal_metadata_importer.bezerk_mode = False + counts = CsvPusher(journal_metadata_importer, f).run() + assert counts['insert'] == 0 + assert counts['exists'] == 9 + assert counts['skip'] == 0 diff --git a/python/tests/import_matched.py b/python/tests/import_matched.py index 22bc45ad..8f694456 100644 --- a/python/tests/import_matched.py +++ b/python/tests/import_matched.py @@ -10,7 +10,7 @@ def matched_importer(api): yield MatchedImporter(api) # TODO: use API to check that entities actually created... -def test_matched_importer_batch(matched_importer): +def test_matched_importer(matched_importer): with open('tests/files/example_matched.json', 'r') as f: JsonLinePusher(matched_importer, f).run() @@ -18,7 +18,10 @@ def test_matched_importer(matched_importer): last_index = matched_importer.api.get_changelog(limit=1)[0].index with open('tests/files/example_matched.json', 'r') as f: matched_importer.bezerk_mode = True - JsonLinePusher(matched_importer, f).run() + counts = JsonLinePusher(matched_importer, f).run() + assert counts['insert'] == 2 + assert counts['exists'] == 0 + assert counts['skip'] == 11 # fetch most recent editgroup change = matched_importer.api.get_changelog_entry(index=last_index+1) @@ -28,6 +31,15 @@ def test_matched_importer(matched_importer): assert eg.extra['git_rev'] assert "fatcat_tools.MatchedImporter" in eg.extra['agent'] + # re-insert; should skip + with open('tests/files/example_matched.json', 'r') as f: + matched_importer.reset() + matched_importer.bezerk_mode = False + counts = JsonLinePusher(matched_importer, f).run() + assert counts['insert'] == 0 + assert counts['exists'] == 2 + assert counts['skip'] == 11 + def test_matched_dict_parse(matched_importer): with open('tests/files/example_matched.json', 'r') as f: raw = json.loads(f.readline()) diff --git a/python/tests/import_orcid.py b/python/tests/import_orcid.py index 4055091d..57886b52 100644 --- a/python/tests/import_orcid.py +++ b/python/tests/import_orcid.py @@ -11,15 +11,17 @@ def orcid_importer(api): def test_orcid_importer_badid(orcid_importer): with open('tests/files/0000-0001-8254-710X.json', 'r') as f: - pusher = JsonLinePusher(orcid_importer, f) - pusher.run() + JsonLinePusher(orcid_importer, f).run() # TODO: use API to check that entities actually created... def test_orcid_importer(orcid_importer): last_index = orcid_importer.api.get_changelog(limit=1)[0].index with open('tests/files/0000-0001-8254-7103.json', 'r') as f: orcid_importer.bezerk_mode = True - JsonLinePusher(orcid_importer, f).run() + counts = JsonLinePusher(orcid_importer, f).run() + assert counts['insert'] == 1 + assert counts['exists'] == 0 + assert counts['skip'] == 0 # fetch most recent editgroup change = orcid_importer.api.get_changelog_entry(index=last_index+1) @@ -29,10 +31,17 @@ def test_orcid_importer(orcid_importer): assert eg.extra['git_rev'] assert "fatcat_tools.OrcidImporter" in eg.extra['agent'] + with open('tests/files/0000-0001-8254-7103.json', 'r') as f: + orcid_importer.reset() + orcid_importer.bezerk_mode = False + counts = JsonLinePusher(orcid_importer, f).run() + assert counts['insert'] == 0 + assert counts['exists'] == 1 + assert counts['skip'] == 0 + def test_orcid_importer_x(orcid_importer): with open('tests/files/0000-0003-3953-765X.json', 'r') as f: - pusher = JsonLinePusher(orcid_importer, f) - pusher.run() + JsonLinePusher(orcid_importer, f).run() c = orcid_importer.api.lookup_creator(orcid="0000-0003-3953-765X") assert c is not None |