summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2020-11-05 22:24:58 -0800
committerBryan Newbold <bnewbold@robocracy.org>2020-11-05 22:24:58 -0800
commit931d5e450c9998177fc222b3d5b41ce16a947569 (patch)
treeacf9d165c3d0137cbb4d90d5736e66bca1d750fc
parent0c7dd38ed09c7a0584d079335fb3d1d53434628c (diff)
downloadfatcat-931d5e450c9998177fc222b3d5b41ce16a947569.tar.gz
fatcat-931d5e450c9998177fc222b3d5b41ce16a947569.zip
ingest: initial 'web' worker implementation
-rwxr-xr-xpython/fatcat_import.py42
-rw-r--r--python/fatcat_tools/importers/__init__.py2
-rw-r--r--python/fatcat_tools/importers/ingest.py324
3 files changed, 301 insertions, 67 deletions
diff --git a/python/fatcat_import.py b/python/fatcat_import.py
index 5dc10f0e..19cf43ec 100755
--- a/python/fatcat_import.py
+++ b/python/fatcat_import.py
@@ -144,6 +144,26 @@ def run_ingest_file(args):
else:
JsonLinePusher(ifri, args.json_file).run()
+def run_ingest_web(args):
+ iwri = IngestWebResultImporter(args.api,
+ editgroup_description=args.editgroup_description_override,
+ skip_source_allowlist=args.skip_source_allowlist,
+ do_updates=args.do_updates,
+ default_link_rel=args.default_link_rel,
+ edit_batch_size=args.batch_size)
+ if args.kafka_mode:
+ KafkaJsonPusher(
+ iwri,
+ args.kafka_hosts,
+ args.kafka_env,
+ "ingest-file-results",
+ "fatcat-{}-ingest-web-result".format(args.kafka_env),
+ kafka_namespace="sandcrawler",
+ consume_batch_size=args.batch_size,
+ ).run()
+ else:
+ JsonLinePusher(iwri, args.json_file).run()
+
def run_savepapernow_file(args):
ifri = SavePaperNowFileImporter(args.api,
editgroup_description=args.editgroup_description_override,
@@ -458,6 +478,28 @@ def main():
default="web",
help="default URL rel for matches (eg, 'publisher', 'web')")
+ sub_ingest_web = subparsers.add_parser('ingest-web-results',
+ help="add/update web entities linked to releases based on sandcrawler ingest results")
+ sub_ingest_web.set_defaults(
+ func=run_ingest_web,
+ auth_var="FATCAT_AUTH_WORKER_CRAWL",
+ )
+ sub_ingest_web.add_argument('json_file',
+ help="ingest_web JSON file to import from",
+ default=sys.stdin, type=argparse.FileType('r'))
+ sub_ingest_web.add_argument('--skip-source-allowlist',
+ action='store_true',
+ help="don't filter import based on request source allowlist")
+ sub_ingest_web.add_argument('--kafka-mode',
+ action='store_true',
+ help="consume from kafka topic (not stdin)")
+ sub_ingest_web.add_argument('--do-updates',
+ action='store_true',
+ help="update pre-existing web entities if new match (instead of skipping)")
+ sub_ingest_web.add_argument('--default-link-rel',
+ default="web",
+ help="default URL rel for matches (eg, 'publisher', 'web')")
+
sub_savepapernow_file = subparsers.add_parser('savepapernow-file-results',
help="add file entities crawled due to async Save Paper Now request")
sub_savepapernow_file.set_defaults(
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py
index b82eb11a..c08e04c2 100644
--- a/python/fatcat_tools/importers/__init__.py
+++ b/python/fatcat_tools/importers/__init__.py
@@ -27,6 +27,6 @@ from .orcid import OrcidImporter
from .arabesque import ArabesqueMatchImporter, ARABESQUE_MATCH_WHERE_CLAUSE
from .wayback_static import auto_wayback_static
from .cdl_dash_dat import auto_cdl_dash_dat
-from .ingest import IngestFileResultImporter, SavePaperNowFileImporter
+from .ingest import IngestFileResultImporter, SavePaperNowFileImporter, IngestWebResultImporter
from .shadow import ShadowLibraryImporter
from .file_meta import FileMetaImporter
diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py
index b6851eec..2042d331 100644
--- a/python/fatcat_tools/importers/ingest.py
+++ b/python/fatcat_tools/importers/ingest.py
@@ -38,39 +38,11 @@ class IngestFileResultImporter(EntityImporter):
if kwargs.get('skip_source_allowlist', False):
self.ingest_request_source_allowlist = []
- def want(self, row):
+ def want_file(self, row) -> bool:
"""
- Logic here probably needs work (TODO):
-
- - Direct ingests via DOI from fatcat-changelog should probably go
- through regardless of GROBID status
- - We should filter/block things like single-page PDFs here
- - public/anonymous submissions could require successful biblio-glutton
- match, or some other sanity check on the fatcat side (eg, fuzzy title
- match)
- - handle the case of release_stage not being 'published'; if pre-print,
- potentially create a new release.
-
- The current logic is intentionally conservative as a first step.
+ File-specific part of want(). Generic across general ingest and save-paper-now.
"""
- if row.get('hit') != True:
- self.counts['skip-hit'] += 1
- return False
- source = row['request'].get('ingest_request_source')
- if not source:
- self.counts['skip-ingest_request_source'] += 1
- return False
- if self.ingest_request_source_whitelist and source not in self.ingest_request_source_whitelist:
- self.counts['skip-ingest_request_source'] += 1
- return False
- if source.startswith('arabesque'):
- if row['request'].get('link_source') not in ('arxiv', 'pmc', 'unpaywall', 'doi', 'mag', 's2'):
- self.counts['skip-arabesque-source'] += 1
- return False
- if source.startswith('savepapernow'):
- # never process async savepapernow requests
- self.counts['skip-savepapernow'] += 1
- return False
+
if not row.get('file_meta'):
self.counts['skip-file-meta'] += 1
return False
@@ -94,25 +66,60 @@ class IngestFileResultImporter(EntityImporter):
return True
- def parse_record(self, row):
+ def want_ingest(self, row) -> bool:
+ """
+ Sandcrawler ingest-specific part of want(). Generic across file and
+ webcapture ingest.
+ """
+ if row.get('hit') != True:
+ self.counts['skip-hit'] += 1
+ return False
+ source = row['request'].get('ingest_request_source')
+ if not source:
+ self.counts['skip-ingest_request_source'] += 1
+ return False
+ if self.ingest_request_source_allowlist and source not in self.ingest_request_source_allowlist:
+ self.counts['skip-ingest_request_source'] += 1
+ return False
+
+ if row['request'].get('link_source') not in ('arxiv', 'pmc', 'unpaywall', 'doi', 'mag', 's2'):
+ self.counts['skip-link-source'] += 1
+ return False
+
+ if source.startswith('savepapernow'):
+ # never process async savepapernow requests
+ self.counts['skip-savepapernow'] += 1
+ return False
+
+ return True
+
+ def want(self, row):
+ """
+ Overall logic here probably needs work (TODO):
+
+ - Direct ingests via DOI from fatcat-changelog should probably go
+ through regardless of GROBID status
+ - We should filter/block things like single-page PDFs here
+ - public/anonymous submissions could require successful biblio-glutton
+ match, or some other sanity check on the fatcat side (eg, fuzzy title
+ match)
+ - handle the case of release_stage not being 'published'; if pre-print,
+ potentially create a new release.
+
+ The current logic is intentionally conservative as a first step.
+ """
+ if not self.want_file(row):
+ return False
+ if not self.want_ingest(row):
+ return False
+
+ return True
+
+ def parse_ingest_release_ident(self, row):
request = row['request']
fatcat = request.get('fatcat')
- file_meta = row['file_meta']
-
- # double check that want() filtered request correctly (eg, old requests)
- if request.get('ingest_type') not in ('pdf', 'xml'):
- self.counts['skip-ingest-type'] += 1
- return None
- assert (request['ingest_type'], file_meta['mimetype']) in [
- ("pdf", "application/pdf"),
- ("xml", "application/xml"),
- ("xml", "application/jats+xml"),
- ("xml", "application/tei+xml"),
- ("xml", "text/xml"),
- ]
- # identify release by fatcat ident, or extid lookup, or biblio-glutton match
release_ident = None
if fatcat and fatcat.get('release_ident'):
release_ident = fatcat.get('release_ident')
@@ -138,16 +145,16 @@ class IngestFileResultImporter(EntityImporter):
return None
release_ident = release.ident
break
+
if self.use_glutton_match and not release_ident and row.get('grobid'):
# try biblio-glutton extracted hit
if row['grobid'].get('fatcat_release'):
release_ident = row['grobid']['fatcat_release'].split('_')[-1]
self.counts['glutton-match'] += 1
- if not release_ident:
- self.counts['skip-release-not-found'] += 1
- return None
+ return release_ident
+ def parse_terminal(self, row):
terminal = row.get('terminal')
if not terminal:
# support old cdx-only ingest results
@@ -170,6 +177,10 @@ class IngestFileResultImporter(EntityImporter):
terminal['terminal_dt'] = terminal['dt']
assert len(terminal['terminal_dt']) == 14
+ def parse_urls(self, row, terminal):
+
+ request = row['request']
+
default_rel = self.default_link_rel
if request.get('link_source') == 'doi':
default_rel = 'publisher'
@@ -185,6 +196,51 @@ class IngestFileResultImporter(EntityImporter):
urls = [url, ("webarchive", wayback)]
urls = [fatcat_openapi_client.FileUrl(rel=rel, url=url) for (rel, url) in urls]
+ return urls
+
+ def parse_edit_extra(self, row):
+
+ request = row['request']
+ edit_extra = dict()
+
+ if request.get('edit_extra'):
+ edit_extra = request['edit_extra']
+
+ if request.get('ingest_request_source'):
+ edit_extra['ingest_request_source'] = request['ingest_request_source']
+ if request.get('link_source') and request.get('link_source_id'):
+ edit_extra['link_source'] = request['link_source']
+ edit_extra['link_source_id'] = request['link_source_id']
+
+ return edit_extra
+
+ def parse_record(self, row):
+
+ request = row['request']
+ fatcat = request.get('fatcat')
+ file_meta = row['file_meta']
+
+ # double check that want() filtered request correctly (eg, old requests)
+ if request.get('ingest_type') not in ('pdf', 'xml'):
+ self.counts['skip-ingest-type'] += 1
+ return None
+ assert (request['ingest_type'], file_meta['mimetype']) in [
+ ("pdf", "application/pdf"),
+ ("xml", "application/xml"),
+ ("xml", "application/jats+xml"),
+ ("xml", "application/tei+xml"),
+ ("xml", "text/xml"),
+ ]
+
+ # identify release by fatcat ident, or extid lookup, or biblio-glutton match
+ release_ident = self.parse_ingest_release_ident(row)
+
+ if not release_ident:
+ self.counts['skip-release-not-found'] += 1
+ return None
+
+ terminal = self.parse_terminal(row)
+ urls = self.parse_urls(row, terminal)
fe = fatcat_openapi_client.FileEntity(
md5=file_meta['md5hex'],
@@ -195,17 +251,10 @@ class IngestFileResultImporter(EntityImporter):
release_ids=[release_ident],
urls=urls,
)
- if request.get('edit_extra'):
- fe.edit_extra = request['edit_extra']
- else:
- fe.edit_extra = dict()
- if request.get('ingest_request_source'):
- fe.edit_extra['ingest_request_source'] = request['ingest_request_source']
- if request.get('link_source') and request.get('link_source_id'):
- fe.edit_extra['link_source'] = request['link_source']
- fe.edit_extra['link_source_id'] = request['link_source_id']
- if not fe.edit_extra:
- fe.edit_extra = None
+
+ edit_extra = self.parse_edit_extra(row)
+ if edit_extra:
+ fe.edit_extra = edit_extra
return fe
def try_update(self, fe):
@@ -270,6 +319,9 @@ class SavePaperNowFileImporter(IngestFileResultImporter):
def want(self, row):
+ if not self.want_file(row):
+ return False
+
source = row['request'].get('ingest_request_source')
if not source:
self.counts['skip-ingest_request_source'] += 1
@@ -280,12 +332,6 @@ class SavePaperNowFileImporter(IngestFileResultImporter):
if row.get('hit') != True:
self.counts['skip-hit'] += 1
return False
- if not row.get('file_meta'):
- self.counts['skip-file-meta'] += 1
- return False
- if self.require_grobid and row.get('grobid', {}).get('status_code') != 200:
- self.counts['skip-grobid'] += 1
- return False
return True
@@ -306,3 +352,149 @@ class SavePaperNowFileImporter(IngestFileResultImporter):
description=self.editgroup_description,
extra=self.editgroup_extra),
entity_list=batch))
+
+class IngestWebResultImporter(IngestFileResultImporter):
+ """
+ Variant of IngestFileResultImporter for processing HTML ingest requests
+ into webcapture objects.
+ """
+
+ def __init__(self, api, **kwargs):
+
+ eg_desc = kwargs.pop('editgroup_description', None) or "WebCaptures crawled from web using sandcrawler ingest tool"
+ eg_extra = kwargs.pop('editgroup_extra', dict())
+ eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestWebResultImporter')
+ kwargs['do_updates'] = False
+ super().__init__(api,
+ editgroup_description=eg_desc,
+ editgroup_extra=eg_extra,
+ **kwargs)
+
+ def want(self, row):
+
+ if not self.want_ingest(row):
+ return False
+
+ if not row.get('file_meta'):
+ self.counts['skip-file-meta'] += 1
+ return False
+
+ # webcapture-specific filters
+ if row['request'].get('ingest_type') != 'html':
+ self.counts['skip-ingest-type'] += 1
+ return False
+ if row['file_meta'].get('mimetype') not in ("text/html", "application/html"):
+ self.counts['skip-mimetype'] += 1
+ return False
+
+ return True
+
+
+ def parse_record(self, row):
+ """
+ TODO: more of this parsing could be DRY with the file version
+ """
+
+ request = row['request']
+ file_meta = row['file_meta']
+
+ # double check that want() filtered request correctly (eg, old requests)
+ if request.get('ingest_type') != "html":
+ self.counts['skip-ingest-type'] += 1
+ return None
+ if file_meta['mimetype'] not in ("text/html", "application/html"):
+ self.counts['skip-mimetype'] += 1
+ return None
+
+ # identify release by fatcat ident, or extid lookup
+ release_ident = self.parse_ingest_release_ident(row)
+
+ if not release_ident:
+ self.counts['skip-release-not-found'] += 1
+ return None
+
+ terminal = self.parse_terminal(row)
+ urls = self.parse_urls(row, terminal)
+ archive_urls = [u for u in urls if u['rel'] == 'webarchive']
+
+ if terminal['terminal_status_code'] != 200:
+ self.counts['skip-terminal-status-code'] += 1
+ return None
+
+ terminal_cdx = row['cdx']
+ if 'revisit_cdx' in row:
+ terminal_cdx = row['revisit_cdx']
+ assert terminal_cdx['surt']
+ assert terminal_cdx['url'] == terminal['terminal_url']
+
+ wc_cdx = []
+ # primary resource first
+ wc_cdx.append(fatcat_openapi_client.WebcaptureCdxLine(
+ surt=terminal['terminal_surt'], # XXX: from CDX?
+ timestamp=terminal['terminal_dt'], # as an ISO datetime
+ url=terminal['terminal_url'],
+ mimetype=file_meta['mimetype'],
+ status_code=terminal['terminal_status_code'],
+ sha1=file_meta['sha1hex'],
+ sha256=file_meta['sha256hex'],
+ size=file_meta['size_bytes'],
+ ))
+
+ for resource in row.get('html_resources', []):
+ wc_cdx.append(fatcat_openapi_client.WebcaptureCdxLine(
+ surt=resource['surt'],
+ timestamp=resource['timestamp'],
+ url=resource['url'],
+ mimetype=resource.get('mimetype'),
+ size=resource.get('size_bytes'),
+ sha1=resource.get('sha1hex'),
+ sha256=resource.get('sha256hex'),
+ ))
+
+ wc = fatcat_openapi_client.WebCaptureEntity(
+ cdx=wc_cdx,
+ archive_urls=archive_urls,
+ original_url=terminal['terminal_url'],
+ timestamp=terminal['terminal_dt'],
+ release_ids=[release_ident],
+ urls=urls,
+ )
+
+ edit_extra = self.parse_edit_extra(row)
+
+ if edit_extra:
+ wc.edit_extra = edit_extra
+ return wc
+
+
+ def try_update(self, wc):
+
+ # check for existing edits-in-progress with same file hash
+ for other in self._entity_queue:
+ if other.sha1 == wc.sha1:
+ self.counts['skip-in-queue'] += 1
+ return False
+
+ # lookup sha1, or create new entity
+ existing = None
+ # XXX: lookup *release* instead; skip if any existing web capture entities
+ # XXX: only one release per webcapture
+ try:
+ existing = self.api.lookup_file(sha1=wc.sha1)
+ except fatcat_openapi_client.rest.ApiException as err:
+ if err.status != 404:
+ raise err
+
+ if not existing:
+ return True
+ else:
+ # TODO: for now, never update
+ self.counts['skip-update-disabled'] += 1
+ return False
+
+ def insert_batch(self, batch):
+ self.api.create_webcapture_auto_batch(fatcat_openapi_client.WebCaptureAutoBatch(
+ editgroup=fatcat_openapi_client.Editgroup(
+ description=self.editgroup_description,
+ extra=self.editgroup_extra),
+ entity_list=batch))