diff options
-rwxr-xr-x | python/fatcat_import.py | 24 | ||||
-rw-r--r-- | python/fatcat_tools/importers/__init__.py | 2 | ||||
-rw-r--r-- | python/fatcat_tools/importers/ingest.py | 67 |
3 files changed, 89 insertions, 4 deletions
diff --git a/python/fatcat_import.py b/python/fatcat_import.py index 04f58ff7..8d82dab3 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -105,6 +105,17 @@ def run_ingest_file(args): else: JsonLinePusher(ifri, args.json_file).run() +def run_savepapernow_file(args): + ifri = SavePaperNowFileImporter(args.api, + editgroup_description=args.editgroup_description_override, + edit_batch_size=args.batch_size) + if args.kafka_mode: + KafkaJsonPusher(ifri, args.kafka_hosts, args.kafka_env, "ingest-file-results", + "savepapernow-file-result", kafka_namespace="sandcrawler", + consume_batch_size=args.batch_size).run() + else: + JsonLinePusher(ifri, args.json_file).run() + def run_grobid_metadata(args): fmi = GrobidMetadataImporter(args.api, edit_batch_size=args.batch_size, @@ -361,6 +372,19 @@ def main(): default="web", help="default URL rel for matches (eg, 'publisher', 'web')") + sub_savepapernow_file = subparsers.add_parser('savepapernow-file-results', + help="add file entities crawled due to async Save Paper Now request") + sub_savepapernow_file.set_defaults( + func=run_savepapernow_file, + auth_var="FATCAT_AUTH_WORKER_SAVEPAPERNOW", + ) + sub_savepapernow_file.add_argument('json_file', + help="ingest-file JSON file to import from", + default=sys.stdin, type=argparse.FileType('r')) + sub_savepapernow_file.add_argument('--kafka-mode', + action='store_true', + help="consume from kafka topic (not stdin)") + sub_grobid_metadata = subparsers.add_parser('grobid-metadata', help="create release and file entities based on GROBID PDF metadata extraction") sub_grobid_metadata.set_defaults( diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 025a111c..bb9c5b17 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -26,4 +26,4 @@ from .orcid import OrcidImporter from .arabesque import ArabesqueMatchImporter, ARABESQUE_MATCH_WHERE_CLAUSE from .wayback_static import auto_wayback_static from .cdl_dash_dat import auto_cdl_dash_dat -from .ingest import IngestFileResultImporter +from .ingest import IngestFileResultImporter, SavePaperNowFileImporter diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py index deb4ef51..e5484048 100644 --- a/python/fatcat_tools/importers/ingest.py +++ b/python/fatcat_tools/importers/ingest.py @@ -11,8 +11,7 @@ class IngestFileResultImporter(EntityImporter): def __init__(self, api, require_grobid=True, **kwargs): - eg_desc = kwargs.pop('editgroup_description', - "Files crawled from web using sandcrawler ingest tool") + eg_desc = kwargs.pop('editgroup_description', None) or "Files crawled from web using sandcrawler ingest tool" eg_extra = kwargs.pop('editgroup_extra', dict()) eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFileResultImporter') super().__init__(api, @@ -53,9 +52,14 @@ class IngestFileResultImporter(EntityImporter): if row.get('hit') != True: self.counts['skip-hit'] += 1 return False - if self.ingest_request_source_whitelist and row['request'].get('ingest_request_source') not in self.ingest_request_source_whitelist: + source = row['request'].get('ingest_request_source') + if self.ingest_request_source_whitelist and source not in self.ingest_request_source_whitelist: self.counts['skip-ingest_request_source'] += 1 return False + if source.startswith('savepapernow'): + # never process async savepapernow requests + self.counts['skip-savepapernow'] += 1 + return False if not row.get('file_meta'): self.counts['skip-file-meta'] += 1 return False @@ -167,3 +171,60 @@ class IngestFileResultImporter(EntityImporter): extra=self.editgroup_extra), entity_list=batch)) + +class SavePaperNowFileImporter(IngestFileResultImporter): + """ + This worker ingests from the same feed as IngestFileResultImporter, but + only imports files from anonymous save-paper-now requests, and "submits" + them for further human review (as opposed to accepting by default). + """ + + def __init__(self, api, submit_mode=True, **kwargs): + + eg_desc = kwargs.pop('editgroup_description', None) or "Files crawled after a public 'Save Paper Now' request" + eg_extra = kwargs.pop('editgroup_extra', dict()) + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFileSavePaperNow') + kwargs['submit_mode'] = submit_mode + kwargs['require_grobid'] = True + kwargs['do_updates'] = False + super().__init__(api, + editgroup_description=eg_desc, + editgroup_extra=eg_extra, + **kwargs) + + def want(self, row): + + source = row['request'].get('ingest_request_source') + if not source.startswith('savepapernow'): + self.counts['skip-not-savepapernow'] += 1 + return False + if row.get('hit') != True: + self.counts['skip-hit'] += 1 + return False + if not row.get('file_meta'): + self.counts['skip-file-meta'] += 1 + return False + if self.require_grobid and row.get('grobid', {}).get('status_code') != 200: + self.counts['skip-grobid'] += 1 + return False + + return True + + def insert_batch(self, batch): + """ + Usually running in submit_mode, so we can't use auto_batch method + """ + if self.submit_mode: + eg = self.api.create_editgroup(fatcat_openapi_client.Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra)) + for fe in batch: + self.api.create_file(eg.editgroup_id, fe) + self.api.update_editgroup(eg.editgroup_id, eg, submit=True) + else: + self.api.create_file_auto_batch(fatcat_openapi_client.FileAutoBatch( + editgroup=fatcat_openapi_client.Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra), + entity_list=batch)) + |