aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/ingest.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/ingest.py')
-rw-r--r--python/sandcrawler/ingest.py367
1 files changed, 257 insertions, 110 deletions
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py
index 6d8b162..0c8eee6 100644
--- a/python/sandcrawler/ingest.py
+++ b/python/sandcrawler/ingest.py
@@ -5,16 +5,25 @@ import gzip
import time
import base64
import requests
+from typing import Optional, Tuple, Any, Dict, List
from http.server import BaseHTTPRequestHandler, HTTPServer
from collections import namedtuple
+from selectolax.parser import HTMLParser
-from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult
+from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult, fix_transfer_encoding
from sandcrawler.grobid import GrobidClient
from sandcrawler.pdfextract import process_pdf, PdfExtractResult
-from sandcrawler.misc import gen_file_metadata, clean_url
+from sandcrawler.misc import gen_file_metadata, clean_url, parse_cdx_datetime
from sandcrawler.html import extract_fulltext_url
+from sandcrawler.html_ingest import fetch_html_resources, \
+ quick_fetch_html_resources, html_guess_scope, html_extract_body_teixml, \
+ WebResource
+from sandcrawler.html_metadata import html_extract_fulltext_url, \
+ XML_FULLTEXT_PATTERNS, HTML_FULLTEXT_PATTERNS, BiblioMetadata, \
+ html_extract_resources, html_extract_biblio, load_adblock_rules
from sandcrawler.workers import SandcrawlerWorker
from sandcrawler.db import SandcrawlerPostgrestClient
+from sandcrawler.xml import xml_reserialize
class IngestFileWorker(SandcrawlerWorker):
@@ -46,7 +55,7 @@ class IngestFileWorker(SandcrawlerWorker):
def __init__(self, sink=None, **kwargs):
super().__init__()
-
+
self.sink = sink
self.wayback_client = kwargs.get('wayback_client')
if not self.wayback_client:
@@ -63,12 +72,18 @@ class IngestFileWorker(SandcrawlerWorker):
self.grobid_sink = kwargs.get('grobid_sink')
self.thumbnail_sink = kwargs.get('thumbnail_sink')
self.pdftext_sink = kwargs.get('pdftext_sink')
+ self.xmldoc_sink = kwargs.get('xmldoc_sink')
+ self.htmlteixml_sink = kwargs.get('htmlteixml_sink')
+ self.max_hops = 6
self.try_existing_ingest = kwargs.get('try_existing_ingest', False)
self.try_existing_grobid = kwargs.get('try_existing_grobid', True)
self.try_existing_pdfextract = kwargs.get('try_existing_pdfextract', True)
self.try_wayback = kwargs.get('try_wayback', True)
self.try_spn2 = kwargs.get('try_spn2', True)
+ self.html_quick_mode = False
+ self.adblock_rules = load_adblock_rules()
+ self.max_html_resources = 200
self.base_url_blocklist = [
# robot blocking
@@ -76,8 +91,10 @@ class IngestFileWorker(SandcrawlerWorker):
# temporary, until we implement specific fetch and 'petabox' output
"://archive.org/",
+ "://www.archive.org/",
"://web.archive.org/web/",
"://openlibrary.org/",
+ "://www.openlibrary.org/",
"://fatcat.wiki/",
# Domain squats
@@ -135,7 +152,7 @@ class IngestFileWorker(SandcrawlerWorker):
]
- def check_existing_ingest(self, base_url):
+ def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]:
"""
Check in sandcrawler-db (postgres) to see if we have already ingested
this URL (ingest file result table).
@@ -147,14 +164,14 @@ class IngestFileWorker(SandcrawlerWorker):
"""
if not self.try_existing_ingest:
return None
- existing = self.pgrest_client.get_ingest_file_result(base_url)
+ existing = self.pgrest_client.get_ingest_file_result(ingest_type, base_url)
# TODO: filter on more flags?
if existing and existing['hit'] == True:
return existing
else:
return None
- def find_resource(self, url, best_mimetype=None, force_recrawl=False):
+ def find_resource(self, url, best_mimetype=None, force_recrawl=False) -> Optional[ResourceResult]:
"""
Looks in wayback for a resource starting at the URL, following any
redirects. If a hit isn't found, try crawling with SPN.
@@ -183,7 +200,7 @@ class IngestFileWorker(SandcrawlerWorker):
if resource and not resource.hit and resource.terminal_dt and resource.terminal_dt < '20190000000000':
old_failure = True
- if self.try_spn2 and (resource == None or (resource.status == 'no-capture') or soft404 or old_failure):
+ if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture') or soft404 or old_failure):
via = "spn2"
force_simple_get = 0
for domain in self.spn2_simple_get_domains:
@@ -191,14 +208,14 @@ class IngestFileWorker(SandcrawlerWorker):
force_simple_get = 1
break
resource = self.spn_client.crawl_resource(url, self.wayback_client, force_simple_get=force_simple_get)
- print("[FETCH {}\t] {}\t{}".format(
+ print("[FETCH {:>6}] {} {}".format(
via,
- resource.status,
- resource.terminal_url or url),
+ (resource and resource.status),
+ (resource and resource.terminal_url) or url),
file=sys.stderr)
return resource
- def process_existing(self, request, result_row):
+ def process_existing(self, request: dict, result_row: dict) -> dict:
"""
If we have an existing ingest file result, do any database fetches or
additional processing necessary to return a result.
@@ -226,16 +243,25 @@ class IngestFileWorker(SandcrawlerWorker):
}
return result
- def process_hit(self, resource, file_meta):
+ def process_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict:
"""
Run all the necessary processing for a new/fresh ingest hit.
"""
- return {
- 'grobid': self.process_grobid(resource, file_meta),
- 'pdf_meta': self.process_pdfextract(resource, file_meta),
- }
+ if ingest_type == "pdf":
+ return {
+ 'grobid': self.process_grobid(resource, file_meta),
+ 'pdf_meta': self.process_pdfextract(resource, file_meta),
+ }
+ elif ingest_type == "xml":
+ return {
+ 'xml_meta': self.process_xml(resource, file_meta),
+ }
+ elif ingest_type == "html":
+ return self.process_html(resource, file_meta)
+ else:
+ raise NotImplementedError(f"process {ingest_type} hit")
- def process_grobid(self, resource, file_meta):
+ def process_grobid(self, resource: ResourceResult, file_meta: dict) -> dict:
"""
Submits to resource body to GROBID for processing.
@@ -266,7 +292,7 @@ class IngestFileWorker(SandcrawlerWorker):
result.pop('key', None)
return result
- def process_pdfextract(self, resource, file_meta):
+ def process_pdfextract(self, resource: ResourceResult, file_meta: dict) -> dict:
"""
Extracts thumbnail and pdf_meta info from PDF.
@@ -288,13 +314,99 @@ class IngestFileWorker(SandcrawlerWorker):
if self.thumbnail_sink and result.page0_thumbnail is not None:
self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex)
if self.pdftext_sink:
- self.pdftext_sink.push_record(result.to_pdftext_dict())
+ self.pdftext_sink.push_record(result.to_pdftext_dict(), key=result.sha1hex)
result.page0_thumbnail = None
result.text = None
result.file_meta = None
return result.to_pdftext_dict()
- def timeout_response(self, task):
+ def process_xml(self, resource: ResourceResult, file_meta: dict) -> dict:
+ """
+ Simply publishes to Kafka topic.
+
+ In the future, could extract other metadata here (like body word
+ count), or attempting to fetch sub-resources.
+ """
+ if self.xmldoc_sink and file_meta['mimetype'] == "application/jats+xml":
+ jats_xml = xml_reserialize(resource.body)
+ msg = dict(
+ sha1hex=file_meta["sha1hex"],
+ status="success",
+ jats_xml=jats_xml,
+ )
+ self.xmldoc_sink.push_record(msg, key=file_meta['sha1hex'])
+ return dict(status="success")
+
+ def process_html(self, resource: ResourceResult, file_meta: dict) -> dict:
+
+ html_doc = HTMLParser(resource.body)
+ html_biblio = html_extract_biblio(resource.terminal_url, html_doc)
+ assert html_biblio
+ html_body = html_extract_body_teixml(resource.body)
+ html_scope = html_guess_scope(resource.terminal_url, html_doc, html_biblio, html_body.get('word_count'))
+ html_biblio_dict = json.loads(html_biblio.json(exclude_none=True))
+
+ if html_scope not in ('article-fulltext', 'unknown'):
+ html_body.pop("tei_xml", None)
+ return dict(
+ status="html-body-wrong-scope",
+ html_biblio=html_biblio_dict,
+ html_scope=html_scope,
+ )
+
+ raw_resources = html_extract_resources(resource.terminal_url, html_doc, self.adblock_rules)
+ if len(raw_resources) > self.max_html_resources:
+ html_body.pop("tei_xml", None)
+ return dict(
+ status="too-many-resources",
+ html_biblio=html_biblio_dict,
+ html_scope=html_scope,
+ )
+
+ when = parse_cdx_datetime(resource.cdx.datetime)
+
+ full_resources: List[WebResource] = []
+
+ partial_result = dict(
+ html_biblio=html_biblio_dict,
+ html_scope=html_scope,
+ )
+
+ try:
+ if self.html_quick_mode:
+ full_resources = quick_fetch_html_resources(raw_resources, self.wayback_client.cdx_client, when)
+ else:
+ full_resources = fetch_html_resources(raw_resources, self.wayback_client, when)
+ except PetaboxError as e:
+ partial_result['status'] = 'petabox-error'
+ partial_result['error_message'] = str(e)[:1600]
+ return partial_result
+ except CdxApiError as e:
+ partial_result['status'] = 'cdx-error'
+ partial_result['error_message'] = str(e)[:1600]
+ return partial_result
+ except WaybackError as e:
+ partial_result['status'] = 'wayback-error'
+ partial_result['error_message'] = str(e)[:1600]
+ return partial_result
+ except WaybackContentError as e:
+ partial_result['status'] = 'wayback-content-error'
+ partial_result['error_message'] = str(e)[:1600]
+ return partial_result
+
+ if self.htmlteixml_sink and html_body['status'] == "success":
+ self.htmlteixml_sink.push_record(html_body, key=file_meta['sha1hex'])
+
+ html_body.pop("tei_xml", None)
+
+ return dict(
+ html_body=html_body,
+ html_biblio=html_biblio_dict,
+ scope=html_scope,
+ html_resources=[json.loads(r.json(exclude_none=True)) for r in full_resources],
+ )
+
+ def timeout_response(self, task: dict) -> dict:
print("[TIMEOUT]", file=sys.stderr)
return dict(
request=task,
@@ -303,22 +415,20 @@ class IngestFileWorker(SandcrawlerWorker):
error_message="ingest worker internal timeout",
)
- def want(self, request):
- if not request.get('ingest_type') in ('file', 'pdf'):
+ def want(self, request: dict) -> bool:
+ if not request.get('ingest_type') in ('file', 'pdf', 'xml', 'html'):
return False
return True
- def process(self, request, key=None):
+ def process(self, request: dict, key: Any = None) -> dict:
- # backwards compatibility
- if request.get('ingest_type') in ('file', None):
+ # old backwards compatibility
+ if request.get('ingest_type') == 'file':
request['ingest_type'] = 'pdf'
- # for now, only pdf ingest is implemented
- if not 'ingest_type' in request:
- request['ingest_type'] = "pdf"
- assert request.get('ingest_type') == "pdf"
ingest_type = request.get('ingest_type')
+ if ingest_type not in ("pdf", "xml", "html"):
+ raise NotImplementedError(f"can't handle ingest_type={ingest_type}")
# parse/clean URL
# note that we pass through the original/raw URL, and that is what gets
@@ -329,25 +439,27 @@ class IngestFileWorker(SandcrawlerWorker):
for block in self.base_url_blocklist:
if block in base_url:
- print("[SKIP {}\t] {}".format(ingest_type, base_url), file=sys.stderr)
+ print("[SKIP {:>6}] {}".format(ingest_type, base_url), file=sys.stderr)
return dict(request=request, hit=False, status="skip-url-blocklist")
- print("[INGEST {}\t] {}".format(ingest_type, base_url), file=sys.stderr)
+ print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr)
best_mimetype = None
if ingest_type == "pdf":
best_mimetype = "application/pdf"
+ elif ingest_type == "xml":
+ best_mimetype = "text/xml"
+ elif ingest_type == "html":
+ best_mimetype = "text/html"
- existing = self.check_existing_ingest(base_url)
+ existing = self.check_existing_ingest(ingest_type, base_url)
if existing:
return self.process_existing(request, existing)
- result = dict(request=request, hit=False)
+ result: Dict[str, Any] = dict(request=request, hit=False)
next_url = base_url
hops = [base_url]
- self.max_hops = 6
-
while len(hops) <= self.max_hops:
@@ -400,25 +512,9 @@ class IngestFileWorker(SandcrawlerWorker):
result['error_message'] = str(e)[:1600]
return result
- if not resource.hit:
- result['status'] = resource.status
- if resource.terminal_url:
- result['terminal'] = {
- "terminal_url": resource.terminal_url,
- "terminal_dt": resource.terminal_dt,
- "terminal_status_code": resource.terminal_status_code,
- }
- if resource.terminal_url not in result['hops']:
- result['hops'].append(resource.terminal_url)
- return result
-
- if not resource.body:
- result['status'] = 'null-body'
- return result
- file_meta = gen_file_metadata(resource.body)
+ assert resource
- if resource.terminal_url and ('/cookieAbsent' in next_url or 'cookieSet=1' in resource.terminal_url):
- result['status'] = 'blocked-cookie'
+ if resource.terminal_url:
result['terminal'] = {
"terminal_url": resource.terminal_url,
"terminal_dt": resource.terminal_dt,
@@ -426,53 +522,47 @@ class IngestFileWorker(SandcrawlerWorker):
}
if resource.terminal_url not in result['hops']:
result['hops'].append(resource.terminal_url)
+
+ if not resource.hit:
+ result['status'] = resource.status
return result
- # crude handling of content-encoding; wayback fetch library usually
- # (and should always?) handle this
- if file_meta['mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip':
- print("transfer encoding not stripped: {}".format(resource.cdx.mimetype), file=sys.stderr)
- try:
- inner_body = gzip.decompress(resource.body)
- except Exception as e:
- result['status'] = 'bad-gzip-encoding'
- result['error_message'] = str(e)
- return result
- if not inner_body:
- result['status'] = 'null-body'
- return result
- resource = ResourceResult(
- body=inner_body,
- # copy all other fields
- start_url=resource.start_url,
- hit=resource.hit,
- status=resource.status,
- terminal_url=resource.terminal_url,
- terminal_dt=resource.terminal_dt,
- terminal_status_code=resource.terminal_status_code,
- cdx=resource.cdx,
- revisit_cdx=resource.revisit_cdx,
- )
- file_meta = gen_file_metadata(resource.body)
-
- if "html" in file_meta['mimetype'] or "xhtml" in file_meta['mimetype'] or "application/xml" in file_meta['mimetype'] or "text/xml" in file_meta['mimetype']:
+ if resource.terminal_url and ('/cookieAbsent' in next_url or 'cookieSet=1' in resource.terminal_url):
+ result['status'] = 'blocked-cookie'
+ return result
+
+ file_meta = gen_file_metadata(resource.body)
+ try:
+ file_meta, resource = fix_transfer_encoding(file_meta, resource)
+ except Exception as e:
+ result['status'] = 'bad-gzip-encoding'
+ result['error_message'] = str(e)
+ return result
+
+ if not resource.body or file_meta['size_bytes'] == 0:
+ result['status'] = 'null-body'
+ return result
+
+ # here we split based on ingest type to try and extract a next hop
+ html_ish_resource = bool(
+ "html" in file_meta['mimetype']
+ or "xhtml" in file_meta['mimetype']
+ or "application/xml" in file_meta['mimetype']
+ or "text/xml" in file_meta['mimetype']
+ )
+ if ingest_type == "pdf" and html_ish_resource:
# Got landing page or similar. Some XHTML detected as "application/xml"
- if resource.terminal_dt:
- result['terminal'] = {
- "terminal_url": resource.terminal_url,
- "terminal_dt": resource.terminal_dt,
- "terminal_status_code": resource.terminal_status_code,
- }
fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body)
-
- result['html'] = fulltext_url
+ result['extract_next_hop'] = fulltext_url
+
if not fulltext_url:
result['status'] = 'no-pdf-link'
return result
next_url = fulltext_url.get('pdf_url') or fulltext_url.get('next_url')
assert next_url
next_url = clean_url(next_url)
- print("[PARSE\t] {}\t{}".format(
+ print("[PARSE {:>6}] {} {}".format(
+ ingest_type,
fulltext_url.get('technique'),
next_url,
),
@@ -483,7 +573,44 @@ class IngestFileWorker(SandcrawlerWorker):
return result
hops.append(next_url)
continue
-
+ elif ingest_type == "xml" and html_ish_resource:
+ # parse with selectolax, extract XML fulltext URL
+ html_doc = HTMLParser(resource.body)
+ extract_next_hop = html_extract_fulltext_url(resource.terminal_url, html_doc, XML_FULLTEXT_PATTERNS)
+ if extract_next_hop:
+ next_url = extract_next_hop[0]
+ technique = extract_next_hop[1]
+ print("[PARSE {:>6}] {} {}".format(
+ ingest_type,
+ technique,
+ next_url,
+ ),
+ file=sys.stderr)
+ if next_url in hops:
+ result['status'] = 'link-loop'
+ result['error_message'] = "repeated: {}".format(next_url)
+ return result
+ hops.append(next_url)
+ continue
+ elif ingest_type == "html" and html_ish_resource:
+ # parse with selectolax, extract XML fulltext URL
+ html_doc = HTMLParser(resource.body)
+ extract_next_hop = html_extract_fulltext_url(resource.terminal_url, html_doc, HTML_FULLTEXT_PATTERNS)
+ if extract_next_hop:
+ next_url = extract_next_hop[0]
+ technique = extract_next_hop[1]
+ if next_url in hops:
+ # for HTML ingest, we don't count this as a link-loop
+ break
+ print("[PARSE {:>6}] {} {}".format(
+ ingest_type,
+ technique,
+ next_url,
+ ),
+ file=sys.stderr)
+ hops.append(next_url)
+ continue
+
# default is to NOT keep hopping
break
@@ -491,6 +618,11 @@ class IngestFileWorker(SandcrawlerWorker):
result['status'] = "max-hops-exceeded"
return result
+ # fetch must be a hit if we got this far (though not necessarily an ingest hit!)
+ assert resource
+ assert resource.hit == True
+ assert resource.terminal_status_code in (200, 226)
+
if resource.terminal_url:
result['terminal'] = {
"terminal_url": resource.terminal_url,
@@ -499,35 +631,50 @@ class IngestFileWorker(SandcrawlerWorker):
"terminal_sha1hex": file_meta['sha1hex'],
}
- # fetch must be a hit if we got this far (though not necessarily an ingest hit!)
- assert resource.hit == True
- assert resource.terminal_status_code in (200, 226)
-
result['file_meta'] = file_meta
result['cdx'] = cdx_to_dict(resource.cdx)
if resource.revisit_cdx:
result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx)
- # other failure cases
- if not resource.body or file_meta['size_bytes'] == 0:
- result['status'] = 'null-body'
- return result
-
- if not (resource.hit and file_meta['mimetype'] == "application/pdf"):
- result['status'] = "wrong-mimetype" # formerly: "other-mimetype"
- return result
+ if ingest_type == "pdf":
+ if file_meta['mimetype'] != "application/pdf":
+ result['status'] = "wrong-mimetype" # formerly: "other-mimetype"
+ return result
+ elif ingest_type == "xml":
+ if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"):
+ result['status'] = "wrong-mimetype"
+ return result
+ elif ingest_type == "html":
+ if file_meta['mimetype'] not in ("text/html",):
+ result['status'] = "wrong-mimetype"
+ return result
+ else:
+ raise NotImplementedError()
- info = self.process_hit(resource, file_meta)
+ info = self.process_hit(ingest_type, resource, file_meta)
result.update(info)
+ # check if processing turned up an error
+ if info.get('status') not in ('success', None):
+ result['status'] = info['status']
+ return result
+
result['status'] = "success"
result['hit'] = True
- print("[SUCCESS\t] sha1:{} grobid:{} pdfextract:{}".format(
- result.get('file_meta', {}).get('sha1hex'),
- result.get('grobid', {}).get('status_code'),
- result.get('pdf_meta', {}).get('status'),
- ),
- file=sys.stderr)
+ if ingest_type == "pdf":
+ print("[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format(
+ ingest_type,
+ result.get('file_meta', {}).get('sha1hex'),
+ result.get('grobid', {}).get('status_code'),
+ result.get('pdf_meta', {}).get('status'),
+ ),
+ file=sys.stderr)
+ else:
+ print("[SUCCESS {:>5}] sha1:{}".format(
+ ingest_type,
+ result.get('file_meta', {}).get('sha1hex'),
+ ),
+ file=sys.stderr)
return result