diff options
| -rw-r--r-- | CHANGELOG.md | 3 | ||||
| -rwxr-xr-x | python/fatcat_import.py | 33 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/__init__.py | 2 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/common.py | 21 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/ingest.py | 122 | 
5 files changed, 142 insertions, 39 deletions
| diff --git a/CHANGELOG.md b/CHANGELOG.md index 4065c63e..53eb2805 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,11 +20,14 @@ See also:  - reference graph views, based on fuzzy reference dataset in `refcat` and    `fatcat-scholar` projects, stored in elasticsearch index +- savepapernow webcapture/html importer  ### Fixed  - viewing deleted release entities no longer result in 500 error  - mediawiki (wikipedia) OAuth logins (bytes/str bug in fatcat code) +- savepapernow ingest importer force flushes every 5 minutes; should fix a bug +  with many SPN ingest requests not getting imported  ## [0.3.4] - 2021-05-25 diff --git a/python/fatcat_import.py b/python/fatcat_import.py index 1dcfec21..b82e81c7 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -159,6 +159,7 @@ def run_ingest_web(args):              "fatcat-{}-ingest-web-result".format(args.kafka_env),              kafka_namespace="sandcrawler",              consume_batch_size=args.batch_size, +            force_flush=True,          ).run()      else:          JsonLinePusher(iwri, args.json_file).run() @@ -176,6 +177,25 @@ def run_savepapernow_file(args):              "fatcat-{}-savepapernow-file-result".format(args.kafka_env),              kafka_namespace="sandcrawler",              consume_batch_size=args.batch_size, +            force_flush=True, +        ).run() +    else: +        JsonLinePusher(ifri, args.json_file).run() + +def run_savepapernow_web(args): +    ifri = SavePaperNowWebImporter(args.api, +        editgroup_description=args.editgroup_description_override, +        edit_batch_size=args.batch_size) +    if args.kafka_mode: +        KafkaJsonPusher( +            ifri, +            args.kafka_hosts, +            args.kafka_env, +            "ingest-file-results", +            "fatcat-{}-savepapernow-web-result".format(args.kafka_env), +            kafka_namespace="sandcrawler", +            consume_batch_size=args.batch_size, +            force_flush=True,          ).run()      else:          JsonLinePusher(ifri, args.json_file).run() @@ -554,6 +574,19 @@ def main():          action='store_true',          help="consume from kafka topic (not stdin)") +    sub_savepapernow_web = subparsers.add_parser('savepapernow-web-results', +        help="add webcapture entities crawled due to async Save Paper Now request") +    sub_savepapernow_web.set_defaults( +        func=run_savepapernow_web, +        auth_var="FATCAT_AUTH_WORKER_SAVEPAPERNOW", +    ) +    sub_savepapernow_web.add_argument('json_file', +        help="ingest-file JSON file to import from", +        default=sys.stdin, type=argparse.FileType('r')) +    sub_savepapernow_web.add_argument('--kafka-mode', +        action='store_true', +        help="consume from kafka topic (not stdin)") +      sub_grobid_metadata = subparsers.add_parser('grobid-metadata',          help="create release and file entities based on GROBID PDF metadata extraction")      sub_grobid_metadata.set_defaults( diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 6a2edeac..9cb18506 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -27,7 +27,7 @@ from .orcid import OrcidImporter  from .arabesque import ArabesqueMatchImporter, ARABESQUE_MATCH_WHERE_CLAUSE  from .wayback_static import auto_wayback_static  from .cdl_dash_dat import auto_cdl_dash_dat -from .ingest import IngestFileResultImporter, SavePaperNowFileImporter, IngestWebResultImporter +from .ingest import IngestFileResultImporter, SavePaperNowFileImporter, IngestWebResultImporter, SavePaperNowWebImporter  from .shadow import ShadowLibraryImporter  from .file_meta import FileMetaImporter  from .doaj_article import DoajArticleImporter diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 6815a155..e936477c 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -732,8 +732,8 @@ class KafkaBs4XmlPusher(RecordPusher):              batch = self.consumer.consume(                  num_messages=self.consume_batch_size,                  timeout=self.poll_interval) -            print("... got {} kafka messages ({}sec poll interval)".format( -                len(batch), self.poll_interval)) +            print("... got {} kafka messages ({}sec poll interval) {}".format( +                len(batch), self.poll_interval, self.importer.counts))              if not batch:                  if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5):                      # it has been some time, so flush any current editgroup @@ -779,10 +779,12 @@ class KafkaJsonPusher(RecordPusher):          )          self.poll_interval = kwargs.get('poll_interval', 5.0)          self.consume_batch_size = kwargs.get('consume_batch_size', 100) +        self.force_flush = kwargs.get('force_flush', False)      def run(self):          count = 0          last_push = datetime.datetime.now() +        last_force_flush = datetime.datetime.now()          while True:              # Note: this is batch-oriented, because underlying importer is              # often batch-oriented, but this doesn't confirm that entire batch @@ -796,13 +798,24 @@ class KafkaJsonPusher(RecordPusher):              batch = self.consumer.consume(                  num_messages=self.consume_batch_size,                  timeout=self.poll_interval) -            print("... got {} kafka messages ({}sec poll interval)".format( -                len(batch), self.poll_interval)) +            print("... got {} kafka messages ({}sec poll interval) {}".format( +                len(batch), self.poll_interval, self.importer.counts)) +            if self.force_flush: +                # this flushing happens even if there have been 'push' events +                # more recently. it is intended for, eg, importers off the +                # ingest file stream *other than* the usual file importer. the +                # web/HTML and savepapernow importers get frequent messages, +                # but rare 'want() == True', so need an extra flush +                if datetime.datetime.now() - last_force_flush > datetime.timedelta(minutes=5): +                    self.importer.finish() +                    last_push = datetime.datetime.now() +                    last_force_flush = datetime.datetime.now()              if not batch:                  if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5):                      # it has been some time, so flush any current editgroup                      self.importer.finish()                      last_push = datetime.datetime.now() +                    last_force_flush = datetime.datetime.now()                      #print("Flushed any partial import batch: {}".format(self.importer.counts))                  continue              # first check errors on entire batch... diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py index ae3e147a..bc759219 100644 --- a/python/fatcat_tools/importers/ingest.py +++ b/python/fatcat_tools/importers/ingest.py @@ -222,6 +222,12 @@ class IngestFileResultImporter(EntityImporter):              if edit_extra['link_source'] == 'doi':                  edit_extra['link_source_id'] = edit_extra['link_source_id'].lower() +        # GROBID metadata, for SPN requests (when there might not be 'success') +        if request.get('ingest_type') == 'pdf': +            if row.get('grobid') and row['grobid'].get('status') != 'success': +                edit_extra['grobid_status_code'] = row['grobid']['status_code'] +                edit_extra['grobid_version'] = row['grobid'].get('grobid_version') +          return edit_extra      def parse_record(self, row): @@ -304,11 +310,19 @@ class IngestFileResultImporter(EntityImporter):          return False      def insert_batch(self, batch): -        self.api.create_file_auto_batch(fatcat_openapi_client.FileAutoBatch( -            editgroup=fatcat_openapi_client.Editgroup( +        if self.submit_mode: +            eg = self.api.create_editgroup(fatcat_openapi_client.Editgroup(                  description=self.editgroup_description, -                extra=self.editgroup_extra), -            entity_list=batch)) +                extra=self.editgroup_extra)) +            for fe in batch: +                self.api.create_file(eg.editgroup_id, fe) +            self.api.update_editgroup(eg.editgroup_id, eg, submit=True) +        else: +            self.api.create_file_auto_batch(fatcat_openapi_client.FileAutoBatch( +                editgroup=fatcat_openapi_client.Editgroup( +                    description=self.editgroup_description, +                    extra=self.editgroup_extra), +                entity_list=batch))  class SavePaperNowFileImporter(IngestFileResultImporter): @@ -324,7 +338,7 @@ class SavePaperNowFileImporter(IngestFileResultImporter):          eg_extra = kwargs.pop('editgroup_extra', dict())          eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFileSavePaperNow')          kwargs['submit_mode'] = submit_mode -        kwargs['require_grobid'] = True +        kwargs['require_grobid'] = False          kwargs['do_updates'] = False          super().__init__(api,              editgroup_description=eg_desc, @@ -333,9 +347,6 @@ class SavePaperNowFileImporter(IngestFileResultImporter):      def want(self, row): -        if not self.want_file(row): -            return False -          source = row['request'].get('ingest_request_source')          if not source:              self.counts['skip-ingest_request_source'] += 1 @@ -343,29 +354,15 @@ class SavePaperNowFileImporter(IngestFileResultImporter):          if not source.startswith('savepapernow'):              self.counts['skip-not-savepapernow'] += 1              return False +          if row.get('hit') != True:              self.counts['skip-hit'] += 1              return False -        return True +        if not self.want_file(row): +            return False -    def insert_batch(self, batch): -        """ -        Usually running in submit_mode, so we can't use auto_batch method -        """ -        if self.submit_mode: -            eg = self.api.create_editgroup(fatcat_openapi_client.Editgroup( -                description=self.editgroup_description, -                extra=self.editgroup_extra)) -            for fe in batch: -                self.api.create_file(eg.editgroup_id, fe) -            self.api.update_editgroup(eg.editgroup_id, eg, submit=True) -        else: -            self.api.create_file_auto_batch(fatcat_openapi_client.FileAutoBatch( -                editgroup=fatcat_openapi_client.Editgroup( -                    description=self.editgroup_description, -                    extra=self.editgroup_extra), -                entity_list=batch)) +        return True  class IngestWebResultImporter(IngestFileResultImporter): @@ -390,14 +387,13 @@ class IngestWebResultImporter(IngestFileResultImporter):          if not self.want_ingest(row):              return False -        if not row.get('file_meta'): -            self.counts['skip-file-meta'] += 1 -            return False -          # webcapture-specific filters          if row['request'].get('ingest_type') != 'html':              self.counts['skip-ingest-type'] += 1              return False +        if not row.get('file_meta'): +            self.counts['skip-file-meta'] += 1 +            return False          if row['file_meta'].get('mimetype') not in ("text/html", "application/xhtml+xml"):              self.counts['skip-mimetype'] += 1              return False @@ -514,8 +510,66 @@ class IngestWebResultImporter(IngestFileResultImporter):          return True      def insert_batch(self, batch): -        self.api.create_webcapture_auto_batch(fatcat_openapi_client.WebcaptureAutoBatch( -            editgroup=fatcat_openapi_client.Editgroup( +        if self.submit_mode: +            eg = self.api.create_editgroup(fatcat_openapi_client.Editgroup(                  description=self.editgroup_description, -                extra=self.editgroup_extra), -            entity_list=batch)) +                extra=self.editgroup_extra)) +            for fe in batch: +                self.api.create_webcapture(eg.editgroup_id, fe) +            self.api.update_editgroup(eg.editgroup_id, eg, submit=True) +        else: +            self.api.create_webcapture_auto_batch(fatcat_openapi_client.WebcaptureAutoBatch( +                editgroup=fatcat_openapi_client.Editgroup( +                    description=self.editgroup_description, +                    extra=self.editgroup_extra), +                entity_list=batch)) + +class SavePaperNowWebImporter(IngestWebResultImporter): +    """ +    Like SavePaperNowFileImporter, but for webcapture (HTML) ingest. +    """ + +    def __init__(self, api, submit_mode=True, **kwargs): + +        eg_desc = kwargs.pop('editgroup_description', None) or "Webcaptures crawled after a public 'Save Paper Now' request" +        eg_extra = kwargs.pop('editgroup_extra', dict()) +        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestWebSavePaperNow') +        kwargs['submit_mode'] = submit_mode +        kwargs['do_updates'] = False +        super().__init__(api, +            editgroup_description=eg_desc, +            editgroup_extra=eg_extra, +            **kwargs) + +    def want(self, row): +        """ +        Relatively custom want() here, a synthesis of other filters. + +        We do currently allow unknown-scope through for this specific code +        path, which means allowing hit=false. +        """ + +        source = row['request'].get('ingest_request_source') +        if not source: +            self.counts['skip-ingest_request_source'] += 1 +            return False +        if not source.startswith('savepapernow'): +            self.counts['skip-not-savepapernow'] += 1 +            return False + +        # webcapture-specific filters +        if row['request'].get('ingest_type') != 'html': +            self.counts['skip-ingest-type'] += 1 +            return False +        if not row.get('file_meta'): +            self.counts['skip-file-meta'] += 1 +            return False +        if row['file_meta'].get('mimetype') not in ("text/html", "application/xhtml+xml"): +            self.counts['skip-mimetype'] += 1 +            return False + +        if row.get('status') not in ['success', 'unknown-scope']: +            self.counts['skip-hit'] += 1 +            return False + +        return True | 
