aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/importers/ingest.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/importers/ingest.py')
-rw-r--r--python/fatcat_tools/importers/ingest.py329
1 files changed, 280 insertions, 49 deletions
diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py
index 4b1d3702..4fbd19f1 100644
--- a/python/fatcat_tools/importers/ingest.py
+++ b/python/fatcat_tools/importers/ingest.py
@@ -1,4 +1,6 @@
+import datetime
+
import fatcat_openapi_client
from .common import EntityImporter, make_rel_url
@@ -20,10 +22,10 @@ class IngestFileResultImporter(EntityImporter):
assert self.default_link_rel
self.require_grobid = require_grobid
if self.require_grobid:
- print("Requiring GROBID status == 200")
+ print("Requiring GROBID status == 200 (for PDFs)")
else:
print("NOT checking GROBID success")
- self.ingest_request_source_whitelist = [
+ self.ingest_request_source_allowlist = [
'fatcat-changelog',
'fatcat-ingest-container',
'fatcat-ingest',
@@ -35,23 +37,41 @@ class IngestFileResultImporter(EntityImporter):
's2-corpus',
's2',
]
- if kwargs.get('skip_source_whitelist', False):
- self.ingest_request_source_whitelist = []
+ if kwargs.get('skip_source_allowlist', False):
+ self.ingest_request_source_allowlist = []
- def want(self, row):
+ def want_file(self, row) -> bool:
+ """
+ File-specific part of want(). Generic across general ingest and save-paper-now.
"""
- Logic here probably needs work (TODO):
- - Direct ingests via DOI from fatcat-changelog should probably go
- through regardless of GROBID status
- - We should filter/block things like single-page PDFs here
- - public/anonymous submissions could require successful biblio-glutton
- match, or some other sanity check on the fatcat side (eg, fuzzy title
- match)
- - handle the case of release_stage not being 'published'; if pre-print,
- potentially create a new release.
+ if not row.get('file_meta'):
+ self.counts['skip-file-meta'] += 1
+ return False
- The current logic is intentionally conservative as a first step.
+ # type-specific filters
+ if row['request'].get('ingest_type') == 'pdf':
+ if self.require_grobid and row.get('grobid', {}).get('status_code') != 200:
+ self.counts['skip-grobid'] += 1
+ return False
+ if row['file_meta'].get('mimetype') not in ("application/pdf",):
+ self.counts['skip-mimetype'] += 1
+ return False
+ elif row['request'].get('ingest_type') == 'xml':
+ if row['file_meta'].get('mimetype') not in ("application/xml",
+ "application/jats+xml", "application/tei+xml", "text/xml"):
+ self.counts['skip-mimetype'] += 1
+ return False
+ else:
+ self.counts['skip-ingest-type'] += 1
+ return False
+
+ return True
+
+ def want_ingest(self, row) -> bool:
+ """
+ Sandcrawler ingest-specific part of want(). Generic across file and
+ webcapture ingest.
"""
if row.get('hit') != True:
self.counts['skip-hit'] += 1
@@ -60,33 +80,48 @@ class IngestFileResultImporter(EntityImporter):
if not source:
self.counts['skip-ingest_request_source'] += 1
return False
- if self.ingest_request_source_whitelist and source not in self.ingest_request_source_whitelist:
+ if self.ingest_request_source_allowlist and source not in self.ingest_request_source_allowlist:
self.counts['skip-ingest_request_source'] += 1
return False
- if source.startswith('arabesque'):
- if row['request'].get('link_source') not in ('arxiv', 'pmc', 'unpaywall', 'doi', 'mag', 's2'):
- self.counts['skip-arabesque-source'] += 1
- return False
+
+ if row['request'].get('link_source') not in ('arxiv', 'pmc', 'unpaywall', 'doi', 'mag', 's2'):
+ self.counts['skip-link-source'] += 1
+ return False
+
if source.startswith('savepapernow'):
# never process async savepapernow requests
self.counts['skip-savepapernow'] += 1
return False
- if not row.get('file_meta'):
- self.counts['skip-file-meta'] += 1
+
+ return True
+
+ def want(self, row):
+ """
+ Overall logic here probably needs work (TODO):
+
+ - Direct ingests via DOI from fatcat-changelog should probably go
+ through regardless of GROBID status
+ - We should filter/block things like single-page PDFs here
+ - public/anonymous submissions could require successful biblio-glutton
+ match, or some other sanity check on the fatcat side (eg, fuzzy title
+ match)
+ - handle the case of release_stage not being 'published'; if pre-print,
+ potentially create a new release.
+
+ The current logic is intentionally conservative as a first step.
+ """
+ if not self.want_file(row):
return False
- if self.require_grobid and row.get('grobid', {}).get('status_code') != 200:
- self.counts['skip-grobid'] += 1
+ if not self.want_ingest(row):
return False
return True
- def parse_record(self, row):
+ def parse_ingest_release_ident(self, row):
request = row['request']
fatcat = request.get('fatcat')
- file_meta = row['file_meta']
- # identify release by fatcat ident, or extid lookup, or biblio-glutton match
release_ident = None
if fatcat and fatcat.get('release_ident'):
release_ident = fatcat.get('release_ident')
@@ -112,23 +147,21 @@ class IngestFileResultImporter(EntityImporter):
return None
release_ident = release.ident
break
+
if self.use_glutton_match and not release_ident and row.get('grobid'):
# try biblio-glutton extracted hit
if row['grobid'].get('fatcat_release'):
release_ident = row['grobid']['fatcat_release'].split('_')[-1]
self.counts['glutton-match'] += 1
- if not release_ident:
- self.counts['skip-release-not-found'] += 1
- return None
+ return release_ident
+ def parse_terminal(self, row):
terminal = row.get('terminal')
if not terminal:
# support old cdx-only ingest results
cdx = row.get('cdx')
if not cdx:
- # TODO: support archive.org hits?
- self.counts['skip-no-terminal'] += 1
return None
else:
terminal = {
@@ -142,7 +175,15 @@ class IngestFileResultImporter(EntityImporter):
terminal['terminal_url'] = terminal['url']
if not 'terminal_dt' in terminal:
terminal['terminal_dt'] = terminal['dt']
+
+ # convert CDX-style digits to ISO-style timestamp
assert len(terminal['terminal_dt']) == 14
+ terminal['terminal_timestamp'] = datetime.datetime.strptime(terminal['terminal_dt'], "%Y%m%d%H%M%S").isoformat() + "Z"
+ return terminal
+
+ def parse_urls(self, row, terminal):
+
+ request = row['request']
default_rel = self.default_link_rel
if request.get('link_source') == 'doi':
@@ -159,6 +200,55 @@ class IngestFileResultImporter(EntityImporter):
urls = [url, ("webarchive", wayback)]
urls = [fatcat_openapi_client.FileUrl(rel=rel, url=url) for (rel, url) in urls]
+ return urls
+
+ def parse_edit_extra(self, row):
+
+ request = row['request']
+ edit_extra = dict()
+
+ if request.get('edit_extra'):
+ edit_extra = request['edit_extra']
+
+ if request.get('ingest_request_source'):
+ edit_extra['ingest_request_source'] = request['ingest_request_source']
+ if request.get('link_source') and request.get('link_source_id'):
+ edit_extra['link_source'] = request['link_source']
+ edit_extra['link_source_id'] = request['link_source_id']
+
+ return edit_extra
+
+ def parse_record(self, row):
+
+ request = row['request']
+ file_meta = row['file_meta']
+
+ # double check that want() filtered request correctly (eg, old requests)
+ if request.get('ingest_type') not in ('pdf', 'xml'):
+ self.counts['skip-ingest-type'] += 1
+ return None
+ assert (request['ingest_type'], file_meta['mimetype']) in [
+ ("pdf", "application/pdf"),
+ ("xml", "application/xml"),
+ ("xml", "application/jats+xml"),
+ ("xml", "application/tei+xml"),
+ ("xml", "text/xml"),
+ ]
+
+ # identify release by fatcat ident, or extid lookup, or biblio-glutton match
+ release_ident = self.parse_ingest_release_ident(row)
+
+ if not release_ident:
+ self.counts['skip-release-not-found'] += 1
+ return None
+
+ terminal = self.parse_terminal(row)
+ if not terminal:
+ # TODO: support archive.org hits?
+ self.counts['skip-no-terminal'] += 1
+ return None
+
+ urls = self.parse_urls(row, terminal)
fe = fatcat_openapi_client.FileEntity(
md5=file_meta['md5hex'],
@@ -169,17 +259,10 @@ class IngestFileResultImporter(EntityImporter):
release_ids=[release_ident],
urls=urls,
)
- if request.get('edit_extra'):
- fe.edit_extra = request['edit_extra']
- else:
- fe.edit_extra = dict()
- if request.get('ingest_request_source'):
- fe.edit_extra['ingest_request_source'] = request['ingest_request_source']
- if request.get('link_source') and request.get('link_source_id'):
- fe.edit_extra['link_source'] = request['link_source']
- fe.edit_extra['link_source_id'] = request['link_source_id']
- if not fe.edit_extra:
- fe.edit_extra = None
+
+ edit_extra = self.parse_edit_extra(row)
+ if edit_extra:
+ fe.edit_extra = edit_extra
return fe
def try_update(self, fe):
@@ -244,6 +327,9 @@ class SavePaperNowFileImporter(IngestFileResultImporter):
def want(self, row):
+ if not self.want_file(row):
+ return False
+
source = row['request'].get('ingest_request_source')
if not source:
self.counts['skip-ingest_request_source'] += 1
@@ -254,12 +340,6 @@ class SavePaperNowFileImporter(IngestFileResultImporter):
if row.get('hit') != True:
self.counts['skip-hit'] += 1
return False
- if not row.get('file_meta'):
- self.counts['skip-file-meta'] += 1
- return False
- if self.require_grobid and row.get('grobid', {}).get('status_code') != 200:
- self.counts['skip-grobid'] += 1
- return False
return True
@@ -280,3 +360,154 @@ class SavePaperNowFileImporter(IngestFileResultImporter):
description=self.editgroup_description,
extra=self.editgroup_extra),
entity_list=batch))
+
+
+class IngestWebResultImporter(IngestFileResultImporter):
+ """
+ Variant of IngestFileResultImporter for processing HTML ingest requests
+ into webcapture objects.
+ """
+
+ def __init__(self, api, **kwargs):
+
+ eg_desc = kwargs.pop('editgroup_description', None) or "Webcaptures crawled from web using sandcrawler ingest tool"
+ eg_extra = kwargs.pop('editgroup_extra', dict())
+ eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestWebResultImporter')
+ kwargs['do_updates'] = False
+ super().__init__(api,
+ editgroup_description=eg_desc,
+ editgroup_extra=eg_extra,
+ **kwargs)
+
+ def want(self, row):
+
+ if not self.want_ingest(row):
+ return False
+
+ if not row.get('file_meta'):
+ self.counts['skip-file-meta'] += 1
+ return False
+
+ # webcapture-specific filters
+ if row['request'].get('ingest_type') != 'html':
+ self.counts['skip-ingest-type'] += 1
+ return False
+ if row['file_meta'].get('mimetype') not in ("text/html", "application/xhtml+xml"):
+ self.counts['skip-mimetype'] += 1
+ return False
+
+ return True
+
+ def parse_record(self, row):
+
+ request = row['request']
+ file_meta = row['file_meta']
+
+ # double check that want() filtered request correctly (eg, old requests)
+ if request.get('ingest_type') != "html":
+ self.counts['skip-ingest-type'] += 1
+ return None
+ if file_meta['mimetype'] not in ("text/html", "application/xhtml+xml"):
+ self.counts['skip-mimetype'] += 1
+ return None
+
+ # identify release by fatcat ident, or extid lookup
+ release_ident = self.parse_ingest_release_ident(row)
+
+ if not release_ident:
+ self.counts['skip-release-not-found'] += 1
+ return None
+
+ terminal = self.parse_terminal(row)
+ if not terminal:
+ # TODO: support archive.org hits?
+ self.counts['skip-no-terminal'] += 1
+ return None
+
+ urls = self.parse_urls(row, terminal)
+ archive_urls = [u for u in urls if u.rel == 'webarchive']
+
+ if terminal['terminal_status_code'] != 200:
+ self.counts['skip-terminal-status-code'] += 1
+ return None
+
+ terminal_cdx = row['cdx']
+ if 'revisit_cdx' in row:
+ terminal_cdx = row['revisit_cdx']
+ assert terminal_cdx['surt']
+ assert terminal_cdx['url'] == terminal['terminal_url']
+
+ wc_cdx = []
+ # primary resource first
+ wc_cdx.append(fatcat_openapi_client.WebcaptureCdxLine(
+ surt=terminal_cdx['surt'],
+ timestamp=terminal['terminal_timestamp'],
+ url=terminal['terminal_url'],
+ mimetype=file_meta['mimetype'],
+ status_code=terminal['terminal_status_code'],
+ sha1=file_meta['sha1hex'],
+ sha256=file_meta['sha256hex'],
+ size=file_meta['size_bytes'],
+ ))
+
+ for resource in row.get('html_resources', []):
+ timestamp = resource['timestamp']
+ if not "+" in timestamp and not "Z" in timestamp:
+ timestamp += "Z"
+ wc_cdx.append(fatcat_openapi_client.WebcaptureCdxLine(
+ surt=resource['surt'],
+ timestamp=timestamp,
+ url=resource['url'],
+ mimetype=resource.get('mimetype'),
+ size=resource.get('size'),
+ sha1=resource.get('sha1hex'),
+ sha256=resource.get('sha256hex'),
+ ))
+
+ wc = fatcat_openapi_client.WebcaptureEntity(
+ cdx=wc_cdx,
+ archive_urls=archive_urls,
+ original_url=terminal['terminal_url'],
+ timestamp=terminal['terminal_timestamp'],
+ release_ids=[release_ident],
+ )
+
+ edit_extra = self.parse_edit_extra(row)
+
+ if edit_extra:
+ wc.edit_extra = edit_extra
+ return wc
+
+ def try_update(self, wc):
+
+ # check for existing edits-in-progress with same file hash
+ for other in self._entity_queue:
+ if other.sha1 == wc.sha1:
+ self.counts['skip-in-queue'] += 1
+ return False
+
+ # lookup sha1, or create new entity (TODO: API doesn't support this yet)
+ #existing = None
+
+ # TODO: currently only allow one release per webcapture
+ release = self.api.get_release(wc.release_ids[0], expand="webcaptures")
+ if release.webcaptures:
+ # check if this is an existing match, or just a similar hit
+ for other in release.webcaptures:
+ if wc.original_url == other.original_url:
+ # TODO: compare very similar timestamps of same time (different formats)
+ self.counts['exists'] += 1
+ return False
+ self.counts['skip-release-has-webcapture'] += 1
+ return False
+
+ # TODO: for now, never update
+ self.counts['skip-update-disabled'] += 1
+ return False
+
+ def insert_batch(self, batch):
+ self.api.create_webcapture_auto_batch(fatcat_openapi_client.WebcaptureAutoBatch(
+ editgroup=fatcat_openapi_client.Editgroup(
+ description=self.editgroup_description,
+ extra=self.editgroup_extra),
+ entity_list=batch))