diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2020-11-05 22:24:58 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2020-11-05 22:24:58 -0800 |
commit | 931d5e450c9998177fc222b3d5b41ce16a947569 (patch) | |
tree | acf9d165c3d0137cbb4d90d5736e66bca1d750fc /python | |
parent | 0c7dd38ed09c7a0584d079335fb3d1d53434628c (diff) | |
download | fatcat-931d5e450c9998177fc222b3d5b41ce16a947569.tar.gz fatcat-931d5e450c9998177fc222b3d5b41ce16a947569.zip |
ingest: initial 'web' worker implementation
Diffstat (limited to 'python')
-rwxr-xr-x | python/fatcat_import.py | 42 | ||||
-rw-r--r-- | python/fatcat_tools/importers/__init__.py | 2 | ||||
-rw-r--r-- | python/fatcat_tools/importers/ingest.py | 324 |
3 files changed, 301 insertions, 67 deletions
diff --git a/python/fatcat_import.py b/python/fatcat_import.py index 5dc10f0e..19cf43ec 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -144,6 +144,26 @@ def run_ingest_file(args): else: JsonLinePusher(ifri, args.json_file).run() +def run_ingest_web(args): + iwri = IngestWebResultImporter(args.api, + editgroup_description=args.editgroup_description_override, + skip_source_allowlist=args.skip_source_allowlist, + do_updates=args.do_updates, + default_link_rel=args.default_link_rel, + edit_batch_size=args.batch_size) + if args.kafka_mode: + KafkaJsonPusher( + iwri, + args.kafka_hosts, + args.kafka_env, + "ingest-file-results", + "fatcat-{}-ingest-web-result".format(args.kafka_env), + kafka_namespace="sandcrawler", + consume_batch_size=args.batch_size, + ).run() + else: + JsonLinePusher(iwri, args.json_file).run() + def run_savepapernow_file(args): ifri = SavePaperNowFileImporter(args.api, editgroup_description=args.editgroup_description_override, @@ -458,6 +478,28 @@ def main(): default="web", help="default URL rel for matches (eg, 'publisher', 'web')") + sub_ingest_web = subparsers.add_parser('ingest-web-results', + help="add/update web entities linked to releases based on sandcrawler ingest results") + sub_ingest_web.set_defaults( + func=run_ingest_web, + auth_var="FATCAT_AUTH_WORKER_CRAWL", + ) + sub_ingest_web.add_argument('json_file', + help="ingest_web JSON file to import from", + default=sys.stdin, type=argparse.FileType('r')) + sub_ingest_web.add_argument('--skip-source-allowlist', + action='store_true', + help="don't filter import based on request source allowlist") + sub_ingest_web.add_argument('--kafka-mode', + action='store_true', + help="consume from kafka topic (not stdin)") + sub_ingest_web.add_argument('--do-updates', + action='store_true', + help="update pre-existing web entities if new match (instead of skipping)") + sub_ingest_web.add_argument('--default-link-rel', + default="web", + help="default URL rel for matches (eg, 'publisher', 'web')") + sub_savepapernow_file = subparsers.add_parser('savepapernow-file-results', help="add file entities crawled due to async Save Paper Now request") sub_savepapernow_file.set_defaults( diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index b82eb11a..c08e04c2 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -27,6 +27,6 @@ from .orcid import OrcidImporter from .arabesque import ArabesqueMatchImporter, ARABESQUE_MATCH_WHERE_CLAUSE from .wayback_static import auto_wayback_static from .cdl_dash_dat import auto_cdl_dash_dat -from .ingest import IngestFileResultImporter, SavePaperNowFileImporter +from .ingest import IngestFileResultImporter, SavePaperNowFileImporter, IngestWebResultImporter from .shadow import ShadowLibraryImporter from .file_meta import FileMetaImporter diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py index b6851eec..2042d331 100644 --- a/python/fatcat_tools/importers/ingest.py +++ b/python/fatcat_tools/importers/ingest.py @@ -38,39 +38,11 @@ class IngestFileResultImporter(EntityImporter): if kwargs.get('skip_source_allowlist', False): self.ingest_request_source_allowlist = [] - def want(self, row): + def want_file(self, row) -> bool: """ - Logic here probably needs work (TODO): - - - Direct ingests via DOI from fatcat-changelog should probably go - through regardless of GROBID status - - We should filter/block things like single-page PDFs here - - public/anonymous submissions could require successful biblio-glutton - match, or some other sanity check on the fatcat side (eg, fuzzy title - match) - - handle the case of release_stage not being 'published'; if pre-print, - potentially create a new release. - - The current logic is intentionally conservative as a first step. + File-specific part of want(). Generic across general ingest and save-paper-now. """ - if row.get('hit') != True: - self.counts['skip-hit'] += 1 - return False - source = row['request'].get('ingest_request_source') - if not source: - self.counts['skip-ingest_request_source'] += 1 - return False - if self.ingest_request_source_whitelist and source not in self.ingest_request_source_whitelist: - self.counts['skip-ingest_request_source'] += 1 - return False - if source.startswith('arabesque'): - if row['request'].get('link_source') not in ('arxiv', 'pmc', 'unpaywall', 'doi', 'mag', 's2'): - self.counts['skip-arabesque-source'] += 1 - return False - if source.startswith('savepapernow'): - # never process async savepapernow requests - self.counts['skip-savepapernow'] += 1 - return False + if not row.get('file_meta'): self.counts['skip-file-meta'] += 1 return False @@ -94,25 +66,60 @@ class IngestFileResultImporter(EntityImporter): return True - def parse_record(self, row): + def want_ingest(self, row) -> bool: + """ + Sandcrawler ingest-specific part of want(). Generic across file and + webcapture ingest. + """ + if row.get('hit') != True: + self.counts['skip-hit'] += 1 + return False + source = row['request'].get('ingest_request_source') + if not source: + self.counts['skip-ingest_request_source'] += 1 + return False + if self.ingest_request_source_allowlist and source not in self.ingest_request_source_allowlist: + self.counts['skip-ingest_request_source'] += 1 + return False + + if row['request'].get('link_source') not in ('arxiv', 'pmc', 'unpaywall', 'doi', 'mag', 's2'): + self.counts['skip-link-source'] += 1 + return False + + if source.startswith('savepapernow'): + # never process async savepapernow requests + self.counts['skip-savepapernow'] += 1 + return False + + return True + + def want(self, row): + """ + Overall logic here probably needs work (TODO): + + - Direct ingests via DOI from fatcat-changelog should probably go + through regardless of GROBID status + - We should filter/block things like single-page PDFs here + - public/anonymous submissions could require successful biblio-glutton + match, or some other sanity check on the fatcat side (eg, fuzzy title + match) + - handle the case of release_stage not being 'published'; if pre-print, + potentially create a new release. + + The current logic is intentionally conservative as a first step. + """ + if not self.want_file(row): + return False + if not self.want_ingest(row): + return False + + return True + + def parse_ingest_release_ident(self, row): request = row['request'] fatcat = request.get('fatcat') - file_meta = row['file_meta'] - - # double check that want() filtered request correctly (eg, old requests) - if request.get('ingest_type') not in ('pdf', 'xml'): - self.counts['skip-ingest-type'] += 1 - return None - assert (request['ingest_type'], file_meta['mimetype']) in [ - ("pdf", "application/pdf"), - ("xml", "application/xml"), - ("xml", "application/jats+xml"), - ("xml", "application/tei+xml"), - ("xml", "text/xml"), - ] - # identify release by fatcat ident, or extid lookup, or biblio-glutton match release_ident = None if fatcat and fatcat.get('release_ident'): release_ident = fatcat.get('release_ident') @@ -138,16 +145,16 @@ class IngestFileResultImporter(EntityImporter): return None release_ident = release.ident break + if self.use_glutton_match and not release_ident and row.get('grobid'): # try biblio-glutton extracted hit if row['grobid'].get('fatcat_release'): release_ident = row['grobid']['fatcat_release'].split('_')[-1] self.counts['glutton-match'] += 1 - if not release_ident: - self.counts['skip-release-not-found'] += 1 - return None + return release_ident + def parse_terminal(self, row): terminal = row.get('terminal') if not terminal: # support old cdx-only ingest results @@ -170,6 +177,10 @@ class IngestFileResultImporter(EntityImporter): terminal['terminal_dt'] = terminal['dt'] assert len(terminal['terminal_dt']) == 14 + def parse_urls(self, row, terminal): + + request = row['request'] + default_rel = self.default_link_rel if request.get('link_source') == 'doi': default_rel = 'publisher' @@ -185,6 +196,51 @@ class IngestFileResultImporter(EntityImporter): urls = [url, ("webarchive", wayback)] urls = [fatcat_openapi_client.FileUrl(rel=rel, url=url) for (rel, url) in urls] + return urls + + def parse_edit_extra(self, row): + + request = row['request'] + edit_extra = dict() + + if request.get('edit_extra'): + edit_extra = request['edit_extra'] + + if request.get('ingest_request_source'): + edit_extra['ingest_request_source'] = request['ingest_request_source'] + if request.get('link_source') and request.get('link_source_id'): + edit_extra['link_source'] = request['link_source'] + edit_extra['link_source_id'] = request['link_source_id'] + + return edit_extra + + def parse_record(self, row): + + request = row['request'] + fatcat = request.get('fatcat') + file_meta = row['file_meta'] + + # double check that want() filtered request correctly (eg, old requests) + if request.get('ingest_type') not in ('pdf', 'xml'): + self.counts['skip-ingest-type'] += 1 + return None + assert (request['ingest_type'], file_meta['mimetype']) in [ + ("pdf", "application/pdf"), + ("xml", "application/xml"), + ("xml", "application/jats+xml"), + ("xml", "application/tei+xml"), + ("xml", "text/xml"), + ] + + # identify release by fatcat ident, or extid lookup, or biblio-glutton match + release_ident = self.parse_ingest_release_ident(row) + + if not release_ident: + self.counts['skip-release-not-found'] += 1 + return None + + terminal = self.parse_terminal(row) + urls = self.parse_urls(row, terminal) fe = fatcat_openapi_client.FileEntity( md5=file_meta['md5hex'], @@ -195,17 +251,10 @@ class IngestFileResultImporter(EntityImporter): release_ids=[release_ident], urls=urls, ) - if request.get('edit_extra'): - fe.edit_extra = request['edit_extra'] - else: - fe.edit_extra = dict() - if request.get('ingest_request_source'): - fe.edit_extra['ingest_request_source'] = request['ingest_request_source'] - if request.get('link_source') and request.get('link_source_id'): - fe.edit_extra['link_source'] = request['link_source'] - fe.edit_extra['link_source_id'] = request['link_source_id'] - if not fe.edit_extra: - fe.edit_extra = None + + edit_extra = self.parse_edit_extra(row) + if edit_extra: + fe.edit_extra = edit_extra return fe def try_update(self, fe): @@ -270,6 +319,9 @@ class SavePaperNowFileImporter(IngestFileResultImporter): def want(self, row): + if not self.want_file(row): + return False + source = row['request'].get('ingest_request_source') if not source: self.counts['skip-ingest_request_source'] += 1 @@ -280,12 +332,6 @@ class SavePaperNowFileImporter(IngestFileResultImporter): if row.get('hit') != True: self.counts['skip-hit'] += 1 return False - if not row.get('file_meta'): - self.counts['skip-file-meta'] += 1 - return False - if self.require_grobid and row.get('grobid', {}).get('status_code') != 200: - self.counts['skip-grobid'] += 1 - return False return True @@ -306,3 +352,149 @@ class SavePaperNowFileImporter(IngestFileResultImporter): description=self.editgroup_description, extra=self.editgroup_extra), entity_list=batch)) + +class IngestWebResultImporter(IngestFileResultImporter): + """ + Variant of IngestFileResultImporter for processing HTML ingest requests + into webcapture objects. + """ + + def __init__(self, api, **kwargs): + + eg_desc = kwargs.pop('editgroup_description', None) or "WebCaptures crawled from web using sandcrawler ingest tool" + eg_extra = kwargs.pop('editgroup_extra', dict()) + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestWebResultImporter') + kwargs['do_updates'] = False + super().__init__(api, + editgroup_description=eg_desc, + editgroup_extra=eg_extra, + **kwargs) + + def want(self, row): + + if not self.want_ingest(row): + return False + + if not row.get('file_meta'): + self.counts['skip-file-meta'] += 1 + return False + + # webcapture-specific filters + if row['request'].get('ingest_type') != 'html': + self.counts['skip-ingest-type'] += 1 + return False + if row['file_meta'].get('mimetype') not in ("text/html", "application/html"): + self.counts['skip-mimetype'] += 1 + return False + + return True + + + def parse_record(self, row): + """ + TODO: more of this parsing could be DRY with the file version + """ + + request = row['request'] + file_meta = row['file_meta'] + + # double check that want() filtered request correctly (eg, old requests) + if request.get('ingest_type') != "html": + self.counts['skip-ingest-type'] += 1 + return None + if file_meta['mimetype'] not in ("text/html", "application/html"): + self.counts['skip-mimetype'] += 1 + return None + + # identify release by fatcat ident, or extid lookup + release_ident = self.parse_ingest_release_ident(row) + + if not release_ident: + self.counts['skip-release-not-found'] += 1 + return None + + terminal = self.parse_terminal(row) + urls = self.parse_urls(row, terminal) + archive_urls = [u for u in urls if u['rel'] == 'webarchive'] + + if terminal['terminal_status_code'] != 200: + self.counts['skip-terminal-status-code'] += 1 + return None + + terminal_cdx = row['cdx'] + if 'revisit_cdx' in row: + terminal_cdx = row['revisit_cdx'] + assert terminal_cdx['surt'] + assert terminal_cdx['url'] == terminal['terminal_url'] + + wc_cdx = [] + # primary resource first + wc_cdx.append(fatcat_openapi_client.WebcaptureCdxLine( + surt=terminal['terminal_surt'], # XXX: from CDX? + timestamp=terminal['terminal_dt'], # as an ISO datetime + url=terminal['terminal_url'], + mimetype=file_meta['mimetype'], + status_code=terminal['terminal_status_code'], + sha1=file_meta['sha1hex'], + sha256=file_meta['sha256hex'], + size=file_meta['size_bytes'], + )) + + for resource in row.get('html_resources', []): + wc_cdx.append(fatcat_openapi_client.WebcaptureCdxLine( + surt=resource['surt'], + timestamp=resource['timestamp'], + url=resource['url'], + mimetype=resource.get('mimetype'), + size=resource.get('size_bytes'), + sha1=resource.get('sha1hex'), + sha256=resource.get('sha256hex'), + )) + + wc = fatcat_openapi_client.WebCaptureEntity( + cdx=wc_cdx, + archive_urls=archive_urls, + original_url=terminal['terminal_url'], + timestamp=terminal['terminal_dt'], + release_ids=[release_ident], + urls=urls, + ) + + edit_extra = self.parse_edit_extra(row) + + if edit_extra: + wc.edit_extra = edit_extra + return wc + + + def try_update(self, wc): + + # check for existing edits-in-progress with same file hash + for other in self._entity_queue: + if other.sha1 == wc.sha1: + self.counts['skip-in-queue'] += 1 + return False + + # lookup sha1, or create new entity + existing = None + # XXX: lookup *release* instead; skip if any existing web capture entities + # XXX: only one release per webcapture + try: + existing = self.api.lookup_file(sha1=wc.sha1) + except fatcat_openapi_client.rest.ApiException as err: + if err.status != 404: + raise err + + if not existing: + return True + else: + # TODO: for now, never update + self.counts['skip-update-disabled'] += 1 + return False + + def insert_batch(self, batch): + self.api.create_webcapture_auto_batch(fatcat_openapi_client.WebCaptureAutoBatch( + editgroup=fatcat_openapi_client.Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra), + entity_list=batch)) |