From dd219464cfc90b9b469fd851b48b08668ff17ba8 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 1 Oct 2021 15:07:20 -0700 Subject: importer common: more verbose logging (with counts) --- python/fatcat_tools/importers/common.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 6815a155..6ca9a50c 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -732,8 +732,8 @@ class KafkaBs4XmlPusher(RecordPusher): batch = self.consumer.consume( num_messages=self.consume_batch_size, timeout=self.poll_interval) - print("... got {} kafka messages ({}sec poll interval)".format( - len(batch), self.poll_interval)) + print("... got {} kafka messages ({}sec poll interval) {}".format( + len(batch), self.poll_interval, self.importer.counts)) if not batch: if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): # it has been some time, so flush any current editgroup @@ -796,8 +796,8 @@ class KafkaJsonPusher(RecordPusher): batch = self.consumer.consume( num_messages=self.consume_batch_size, timeout=self.poll_interval) - print("... got {} kafka messages ({}sec poll interval)".format( - len(batch), self.poll_interval)) + print("... got {} kafka messages ({}sec poll interval) {}".format( + len(batch), self.poll_interval, self.importer.counts)) if not batch: if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): # it has been some time, so flush any current editgroup -- cgit v1.2.3 From 6e0736cebcb2b1e5ddbae03127572ad9d1ffca49 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 1 Oct 2021 15:11:38 -0700 Subject: ingest importer behavior tweaks - change order of 'want()' checks, so that result counts are clearer - don't require GROBID success for file imports with SPN --- python/fatcat_tools/importers/ingest.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py index ae3e147a..fc02058b 100644 --- a/python/fatcat_tools/importers/ingest.py +++ b/python/fatcat_tools/importers/ingest.py @@ -324,7 +324,7 @@ class SavePaperNowFileImporter(IngestFileResultImporter): eg_extra = kwargs.pop('editgroup_extra', dict()) eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFileSavePaperNow') kwargs['submit_mode'] = submit_mode - kwargs['require_grobid'] = True + kwargs['require_grobid'] = False kwargs['do_updates'] = False super().__init__(api, editgroup_description=eg_desc, @@ -333,9 +333,6 @@ class SavePaperNowFileImporter(IngestFileResultImporter): def want(self, row): - if not self.want_file(row): - return False - source = row['request'].get('ingest_request_source') if not source: self.counts['skip-ingest_request_source'] += 1 @@ -343,10 +340,14 @@ class SavePaperNowFileImporter(IngestFileResultImporter): if not source.startswith('savepapernow'): self.counts['skip-not-savepapernow'] += 1 return False + if row.get('hit') != True: self.counts['skip-hit'] += 1 return False + if not self.want_file(row): + return False + return True def insert_batch(self, batch): @@ -390,14 +391,13 @@ class IngestWebResultImporter(IngestFileResultImporter): if not self.want_ingest(row): return False - if not row.get('file_meta'): - self.counts['skip-file-meta'] += 1 - return False - # webcapture-specific filters if row['request'].get('ingest_type') != 'html': self.counts['skip-ingest-type'] += 1 return False + if not row.get('file_meta'): + self.counts['skip-file-meta'] += 1 + return False if row['file_meta'].get('mimetype') not in ("text/html", "application/xhtml+xml"): self.counts['skip-mimetype'] += 1 return False -- cgit v1.2.3 From 9618d5146eea046342b69895e68b937a056d2816 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 1 Oct 2021 17:33:42 -0700 Subject: new SPN web (html) importer --- python/fatcat_import.py | 30 +++++++++ python/fatcat_tools/importers/__init__.py | 2 +- python/fatcat_tools/importers/ingest.py | 106 ++++++++++++++++++++++-------- 3 files changed, 111 insertions(+), 27 deletions(-) diff --git a/python/fatcat_import.py b/python/fatcat_import.py index 1dcfec21..7e790fa4 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -180,6 +180,23 @@ def run_savepapernow_file(args): else: JsonLinePusher(ifri, args.json_file).run() +def run_savepapernow_web(args): + ifri = SavePaperNowWebImporter(args.api, + editgroup_description=args.editgroup_description_override, + edit_batch_size=args.batch_size) + if args.kafka_mode: + KafkaJsonPusher( + ifri, + args.kafka_hosts, + args.kafka_env, + "ingest-file-results", + "fatcat-{}-savepapernow-web-result".format(args.kafka_env), + kafka_namespace="sandcrawler", + consume_batch_size=args.batch_size, + ).run() + else: + JsonLinePusher(ifri, args.json_file).run() + def run_grobid_metadata(args): fmi = GrobidMetadataImporter(args.api, edit_batch_size=args.batch_size, @@ -554,6 +571,19 @@ def main(): action='store_true', help="consume from kafka topic (not stdin)") + sub_savepapernow_web = subparsers.add_parser('savepapernow-web-results', + help="add webcapture entities crawled due to async Save Paper Now request") + sub_savepapernow_web.set_defaults( + func=run_savepapernow_web, + auth_var="FATCAT_AUTH_WORKER_SAVEPAPERNOW", + ) + sub_savepapernow_web.add_argument('json_file', + help="ingest-file JSON file to import from", + default=sys.stdin, type=argparse.FileType('r')) + sub_savepapernow_web.add_argument('--kafka-mode', + action='store_true', + help="consume from kafka topic (not stdin)") + sub_grobid_metadata = subparsers.add_parser('grobid-metadata', help="create release and file entities based on GROBID PDF metadata extraction") sub_grobid_metadata.set_defaults( diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 6a2edeac..9cb18506 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -27,7 +27,7 @@ from .orcid import OrcidImporter from .arabesque import ArabesqueMatchImporter, ARABESQUE_MATCH_WHERE_CLAUSE from .wayback_static import auto_wayback_static from .cdl_dash_dat import auto_cdl_dash_dat -from .ingest import IngestFileResultImporter, SavePaperNowFileImporter, IngestWebResultImporter +from .ingest import IngestFileResultImporter, SavePaperNowFileImporter, IngestWebResultImporter, SavePaperNowWebImporter from .shadow import ShadowLibraryImporter from .file_meta import FileMetaImporter from .doaj_article import DoajArticleImporter diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py index fc02058b..bc759219 100644 --- a/python/fatcat_tools/importers/ingest.py +++ b/python/fatcat_tools/importers/ingest.py @@ -222,6 +222,12 @@ class IngestFileResultImporter(EntityImporter): if edit_extra['link_source'] == 'doi': edit_extra['link_source_id'] = edit_extra['link_source_id'].lower() + # GROBID metadata, for SPN requests (when there might not be 'success') + if request.get('ingest_type') == 'pdf': + if row.get('grobid') and row['grobid'].get('status') != 'success': + edit_extra['grobid_status_code'] = row['grobid']['status_code'] + edit_extra['grobid_version'] = row['grobid'].get('grobid_version') + return edit_extra def parse_record(self, row): @@ -304,11 +310,19 @@ class IngestFileResultImporter(EntityImporter): return False def insert_batch(self, batch): - self.api.create_file_auto_batch(fatcat_openapi_client.FileAutoBatch( - editgroup=fatcat_openapi_client.Editgroup( + if self.submit_mode: + eg = self.api.create_editgroup(fatcat_openapi_client.Editgroup( description=self.editgroup_description, - extra=self.editgroup_extra), - entity_list=batch)) + extra=self.editgroup_extra)) + for fe in batch: + self.api.create_file(eg.editgroup_id, fe) + self.api.update_editgroup(eg.editgroup_id, eg, submit=True) + else: + self.api.create_file_auto_batch(fatcat_openapi_client.FileAutoBatch( + editgroup=fatcat_openapi_client.Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra), + entity_list=batch)) class SavePaperNowFileImporter(IngestFileResultImporter): @@ -350,24 +364,6 @@ class SavePaperNowFileImporter(IngestFileResultImporter): return True - def insert_batch(self, batch): - """ - Usually running in submit_mode, so we can't use auto_batch method - """ - if self.submit_mode: - eg = self.api.create_editgroup(fatcat_openapi_client.Editgroup( - description=self.editgroup_description, - extra=self.editgroup_extra)) - for fe in batch: - self.api.create_file(eg.editgroup_id, fe) - self.api.update_editgroup(eg.editgroup_id, eg, submit=True) - else: - self.api.create_file_auto_batch(fatcat_openapi_client.FileAutoBatch( - editgroup=fatcat_openapi_client.Editgroup( - description=self.editgroup_description, - extra=self.editgroup_extra), - entity_list=batch)) - class IngestWebResultImporter(IngestFileResultImporter): """ @@ -514,8 +510,66 @@ class IngestWebResultImporter(IngestFileResultImporter): return True def insert_batch(self, batch): - self.api.create_webcapture_auto_batch(fatcat_openapi_client.WebcaptureAutoBatch( - editgroup=fatcat_openapi_client.Editgroup( + if self.submit_mode: + eg = self.api.create_editgroup(fatcat_openapi_client.Editgroup( description=self.editgroup_description, - extra=self.editgroup_extra), - entity_list=batch)) + extra=self.editgroup_extra)) + for fe in batch: + self.api.create_webcapture(eg.editgroup_id, fe) + self.api.update_editgroup(eg.editgroup_id, eg, submit=True) + else: + self.api.create_webcapture_auto_batch(fatcat_openapi_client.WebcaptureAutoBatch( + editgroup=fatcat_openapi_client.Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra), + entity_list=batch)) + +class SavePaperNowWebImporter(IngestWebResultImporter): + """ + Like SavePaperNowFileImporter, but for webcapture (HTML) ingest. + """ + + def __init__(self, api, submit_mode=True, **kwargs): + + eg_desc = kwargs.pop('editgroup_description', None) or "Webcaptures crawled after a public 'Save Paper Now' request" + eg_extra = kwargs.pop('editgroup_extra', dict()) + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestWebSavePaperNow') + kwargs['submit_mode'] = submit_mode + kwargs['do_updates'] = False + super().__init__(api, + editgroup_description=eg_desc, + editgroup_extra=eg_extra, + **kwargs) + + def want(self, row): + """ + Relatively custom want() here, a synthesis of other filters. + + We do currently allow unknown-scope through for this specific code + path, which means allowing hit=false. + """ + + source = row['request'].get('ingest_request_source') + if not source: + self.counts['skip-ingest_request_source'] += 1 + return False + if not source.startswith('savepapernow'): + self.counts['skip-not-savepapernow'] += 1 + return False + + # webcapture-specific filters + if row['request'].get('ingest_type') != 'html': + self.counts['skip-ingest-type'] += 1 + return False + if not row.get('file_meta'): + self.counts['skip-file-meta'] += 1 + return False + if row['file_meta'].get('mimetype') not in ("text/html", "application/xhtml+xml"): + self.counts['skip-mimetype'] += 1 + return False + + if row.get('status') not in ['success', 'unknown-scope']: + self.counts['skip-hit'] += 1 + return False + + return True -- cgit v1.2.3 From b72c18e3518e827bd09044deaadcbf0b0ca50335 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 1 Oct 2021 17:39:40 -0700 Subject: kafka import: optional 'force-flush' mode for some importers Behavior and motivation described in the kafka json import comment. --- python/fatcat_import.py | 3 +++ python/fatcat_tools/importers/common.py | 13 +++++++++++++ 2 files changed, 16 insertions(+) diff --git a/python/fatcat_import.py b/python/fatcat_import.py index 7e790fa4..b82e81c7 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -159,6 +159,7 @@ def run_ingest_web(args): "fatcat-{}-ingest-web-result".format(args.kafka_env), kafka_namespace="sandcrawler", consume_batch_size=args.batch_size, + force_flush=True, ).run() else: JsonLinePusher(iwri, args.json_file).run() @@ -176,6 +177,7 @@ def run_savepapernow_file(args): "fatcat-{}-savepapernow-file-result".format(args.kafka_env), kafka_namespace="sandcrawler", consume_batch_size=args.batch_size, + force_flush=True, ).run() else: JsonLinePusher(ifri, args.json_file).run() @@ -193,6 +195,7 @@ def run_savepapernow_web(args): "fatcat-{}-savepapernow-web-result".format(args.kafka_env), kafka_namespace="sandcrawler", consume_batch_size=args.batch_size, + force_flush=True, ).run() else: JsonLinePusher(ifri, args.json_file).run() diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 6ca9a50c..e936477c 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -779,10 +779,12 @@ class KafkaJsonPusher(RecordPusher): ) self.poll_interval = kwargs.get('poll_interval', 5.0) self.consume_batch_size = kwargs.get('consume_batch_size', 100) + self.force_flush = kwargs.get('force_flush', False) def run(self): count = 0 last_push = datetime.datetime.now() + last_force_flush = datetime.datetime.now() while True: # Note: this is batch-oriented, because underlying importer is # often batch-oriented, but this doesn't confirm that entire batch @@ -798,11 +800,22 @@ class KafkaJsonPusher(RecordPusher): timeout=self.poll_interval) print("... got {} kafka messages ({}sec poll interval) {}".format( len(batch), self.poll_interval, self.importer.counts)) + if self.force_flush: + # this flushing happens even if there have been 'push' events + # more recently. it is intended for, eg, importers off the + # ingest file stream *other than* the usual file importer. the + # web/HTML and savepapernow importers get frequent messages, + # but rare 'want() == True', so need an extra flush + if datetime.datetime.now() - last_force_flush > datetime.timedelta(minutes=5): + self.importer.finish() + last_push = datetime.datetime.now() + last_force_flush = datetime.datetime.now() if not batch: if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): # it has been some time, so flush any current editgroup self.importer.finish() last_push = datetime.datetime.now() + last_force_flush = datetime.datetime.now() #print("Flushed any partial import batch: {}".format(self.importer.counts)) continue # first check errors on entire batch... -- cgit v1.2.3 From d540fb836b73586146d1556640ab55cbc1a04be7 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 1 Oct 2021 17:47:48 -0700 Subject: update changelog with notable ingest importer tweaks --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4065c63e..53eb2805 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,11 +20,14 @@ See also: - reference graph views, based on fuzzy reference dataset in `refcat` and `fatcat-scholar` projects, stored in elasticsearch index +- savepapernow webcapture/html importer ### Fixed - viewing deleted release entities no longer result in 500 error - mediawiki (wikipedia) OAuth logins (bytes/str bug in fatcat code) +- savepapernow ingest importer force flushes every 5 minutes; should fix a bug + with many SPN ingest requests not getting imported ## [0.3.4] - 2021-05-25 -- cgit v1.2.3