diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2019-01-23 15:02:03 -0800 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2019-01-23 15:02:03 -0800 | 
| commit | 1443f05faebd9e697086132694401f6a6c42d9b5 (patch) | |
| tree | 8da8b8e7f4c957c5edccefe9188741c15697cd46 /python/fatcat_tools/importers | |
| parent | 1fa8f820fd3b7c64d424f55796d2b860d22e4b22 (diff) | |
| download | fatcat-1443f05faebd9e697086132694401f6a6c42d9b5.tar.gz fatcat-1443f05faebd9e697086132694401f6a6c42d9b5.zip | |
more tests; fix some importer behavior
Diffstat (limited to 'python/fatcat_tools/importers')
| -rw-r--r-- | python/fatcat_tools/importers/common.py | 30 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/grobid_metadata.py | 50 | 
2 files changed, 43 insertions, 37 deletions
| diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index b6b20b4c..2d5c89b3 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -34,7 +34,7 @@ class EntityImporter:          self.create_<entity>(entity) -> EntityEdit              for related entity types          self.push_entity(entity) -        self.counts['exits'] += 1 +        self.counts['exists'] += 1              if didn't update or insert because of existing)          self.counts['update'] += 1              if updated an entity @@ -52,16 +52,19 @@ class EntityImporter:          self.edit_batch_size = kwargs.get('edit_batch_size', 100)          self.editgroup_description = kwargs.get('editgroup_description')          self.editgroup_extra = kwargs.get('editgroup_extra') -        self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0}) -        self._edit_count = 0 -        self._editgroup_id = None -        self._entity_queue = [] +        self.reset()          self._issnl_id_map = dict()          self._orcid_id_map = dict()          self._orcid_regex = re.compile("^\\d{4}-\\d{4}-\\d{4}-\\d{3}[\\dX]$")          self._doi_id_map = dict() +    def reset(self): +        self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0}) +        self._edit_count = 0 +        self._editgroup_id = None +        self._entity_queue = [] +      def push_record(self, raw_record):          """          Returns nothing. @@ -278,7 +281,9 @@ class JsonLinePusher(RecordPusher):                  continue              record = json.loads(line)              self.importer.push_record(record) -        print(self.importer.finish()) +        counts = self.importer.finish() +        print(counts) +        return counts  class CsvPusher(RecordPusher): @@ -292,7 +297,9 @@ class CsvPusher(RecordPusher):              if not line:                  continue              self.importer.push_record(line) -        print(self.importer.finish()) +        counts = self.importer.finish() +        print(counts) +        return counts  class LinePusher(RecordPusher): @@ -306,7 +313,9 @@ class LinePusher(RecordPusher):              if not line:                  continue              self.importer.push_record(line) -        print(self.importer.finish()) +        counts = self.importer.finish() +        print(counts) +        return counts  class KafkaJsonPusher(RecordPusher): @@ -332,7 +341,9 @@ class KafkaJsonPusher(RecordPusher):                  print("Import counts: {}".format(self.importer.counts))          # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or          # commit the current batch if it has been lingering -        print(self.importer.finish()) +        counts = self.importer.finish() +        print(counts) +        return counts  def make_kafka_consumer(hosts, env, topic_suffix, group): @@ -349,4 +360,3 @@ def make_kafka_consumer(hosts, env, topic_suffix, group):          compacted_topic=True,      )      return consumer - diff --git a/python/fatcat_tools/importers/grobid_metadata.py b/python/fatcat_tools/importers/grobid_metadata.py index c1835b9f..4d3b41bc 100644 --- a/python/fatcat_tools/importers/grobid_metadata.py +++ b/python/fatcat_tools/importers/grobid_metadata.py @@ -34,51 +34,47 @@ class GrobidMetadataImporter(EntityImporter):          self.default_link_rel = kwargs.get("default_link_rel", "web")      def want(self, raw_record): +        return True + +    def parse_record(self, row): -        fields = raw_record.split('\t') +        fields = row.split('\t')          sha1_key = fields[0] -        sha1 = base64.b16encode(base64.b32decode(sha1_key.replace('sha1:', ''))).decode('ascii').lower() -        #cdx = json.loads(fields[1]) -        #mimetype = fields[2] -        #file_size = int(fields[3]) +        cdx = json.loads(fields[1]) +        mimetype = fields[2] +        file_size = int(fields[3])          grobid_meta = json.loads(fields[4]) +        fe = self.parse_file_metadata(sha1_key, cdx, mimetype, file_size) +        re = self.parse_grobid_json(grobid_meta) -        if not grobid_meta.get('title'): -            return False +        if not (fe and re): +            return None          # lookup existing file SHA1 +        existing = None          try: -            existing_file = self.api.lookup_file(sha1=sha1) +            existing = self.api.lookup_file(sha1=fe.sha1)          except fatcat_client.rest.ApiException as err:              if err.status != 404:                  raise err -            existing_file = None          # if file is already in here, presumably not actually long-tail +        # HACK: this is doing an exists check in parse_record(), which is weird          # TODO: this is where we should check if the file actually has          # release_ids and/or URLs associated with it -        if existing_file and not self.bezerk_mode: -            return False -        return True - -    def parse_record(self, row): - -        fields = row.split('\t') -        sha1_key = fields[0] -        cdx = json.loads(fields[1]) -        mimetype = fields[2] -        file_size = int(fields[3]) -        grobid_meta = json.loads(fields[4]) -        fe = self.parse_file_metadata(sha1_key, cdx, mimetype, file_size) -        re = self.parse_grobid_json(grobid_meta) -        assert (fe and re) +        if existing and not self.bezerk_mode: +            self.counts['exists'] += 1 +            self.counts['skip'] -= 1 +            return None          release_edit = self.create_release(re)          fe.release_ids.append(release_edit.ident)          return fe      def parse_grobid_json(self, obj): -        assert obj.get('title') + +        if not obj.get('title'): +            return None          extra = dict() @@ -196,8 +192,8 @@ class GrobidMetadataImporter(EntityImporter):          return fe -    def try_update(entity): -        # we did this in want() +    def try_update(self, entity): +        # did the exists check in 'parse_record()', because we needed to create a release          return True      def insert_batch(self, batch): | 
