aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/sandcrawler/ingest.py240
1 files changed, 118 insertions, 122 deletions
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py
index a39d9ea..633e856 100644
--- a/python/sandcrawler/ingest.py
+++ b/python/sandcrawler/ingest.py
@@ -5,10 +5,11 @@ import gzip
import time
import base64
import requests
+from typing import Optional, Tuple, Any, Dict
from http.server import BaseHTTPRequestHandler, HTTPServer
from collections import namedtuple
-from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult
+from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult, fix_transfer_encoding
from sandcrawler.grobid import GrobidClient
from sandcrawler.pdfextract import process_pdf, PdfExtractResult
from sandcrawler.misc import gen_file_metadata, clean_url
@@ -46,7 +47,7 @@ class IngestFileWorker(SandcrawlerWorker):
def __init__(self, sink=None, **kwargs):
super().__init__()
-
+
self.sink = sink
self.wayback_client = kwargs.get('wayback_client')
if not self.wayback_client:
@@ -63,6 +64,7 @@ class IngestFileWorker(SandcrawlerWorker):
self.grobid_sink = kwargs.get('grobid_sink')
self.thumbnail_sink = kwargs.get('thumbnail_sink')
self.pdftext_sink = kwargs.get('pdftext_sink')
+ self.max_hops = 6
self.try_existing_ingest = kwargs.get('try_existing_ingest', False)
self.try_existing_grobid = kwargs.get('try_existing_grobid', True)
@@ -137,7 +139,7 @@ class IngestFileWorker(SandcrawlerWorker):
]
- def check_existing_ingest(self, base_url):
+ def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]:
"""
Check in sandcrawler-db (postgres) to see if we have already ingested
this URL (ingest file result table).
@@ -149,14 +151,14 @@ class IngestFileWorker(SandcrawlerWorker):
"""
if not self.try_existing_ingest:
return None
- existing = self.pgrest_client.get_ingest_file_result(base_url)
+ existing = self.pgrest_client.get_ingest_file_result(ingest_type, base_url)
# TODO: filter on more flags?
if existing and existing['hit'] == True:
return existing
else:
return None
- def find_resource(self, url, best_mimetype=None, force_recrawl=False):
+ def find_resource(self, url, best_mimetype=None, force_recrawl=False) -> Optional[ResourceResult]:
"""
Looks in wayback for a resource starting at the URL, following any
redirects. If a hit isn't found, try crawling with SPN.
@@ -185,7 +187,7 @@ class IngestFileWorker(SandcrawlerWorker):
if resource and not resource.hit and resource.terminal_dt and resource.terminal_dt < '20190000000000':
old_failure = True
- if self.try_spn2 and (resource == None or (resource.status == 'no-capture') or soft404 or old_failure):
+ if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture') or soft404 or old_failure):
via = "spn2"
force_simple_get = 0
for domain in self.spn2_simple_get_domains:
@@ -195,12 +197,12 @@ class IngestFileWorker(SandcrawlerWorker):
resource = self.spn_client.crawl_resource(url, self.wayback_client, force_simple_get=force_simple_get)
print("[FETCH {:>6}] {} {}".format(
via,
- resource.status,
- resource.terminal_url or url),
+ (resource and resource.status),
+ (resource and resource.terminal_url) or url),
file=sys.stderr)
return resource
- def process_existing(self, request, result_row):
+ def process_existing(self, request: dict, result_row: dict) -> dict:
"""
If we have an existing ingest file result, do any database fetches or
additional processing necessary to return a result.
@@ -228,16 +230,22 @@ class IngestFileWorker(SandcrawlerWorker):
}
return result
- def process_hit(self, resource, file_meta):
+ def process_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict:
"""
Run all the necessary processing for a new/fresh ingest hit.
"""
- return {
- 'grobid': self.process_grobid(resource, file_meta),
- 'pdf_meta': self.process_pdfextract(resource, file_meta),
- }
+ if ingest_type == "pdf":
+ return {
+ 'grobid': self.process_grobid(resource, file_meta),
+ 'pdf_meta': self.process_pdfextract(resource, file_meta),
+ }
+ elif ingest_type == "xml":
+ # TODO
+ raise NotImplementedError(f"process {ingest_type} hit")
+ else:
+ raise NotImplementedError(f"process {ingest_type} hit")
- def process_grobid(self, resource, file_meta):
+ def process_grobid(self, resource: ResourceResult, file_meta: dict) -> dict:
"""
Submits to resource body to GROBID for processing.
@@ -268,7 +276,7 @@ class IngestFileWorker(SandcrawlerWorker):
result.pop('key', None)
return result
- def process_pdfextract(self, resource, file_meta):
+ def process_pdfextract(self, resource: ResourceResult, file_meta: dict) -> dict:
"""
Extracts thumbnail and pdf_meta info from PDF.
@@ -296,7 +304,7 @@ class IngestFileWorker(SandcrawlerWorker):
result.file_meta = None
return result.to_pdftext_dict()
- def timeout_response(self, task):
+ def timeout_response(self, task: dict) -> dict:
print("[TIMEOUT]", file=sys.stderr)
return dict(
request=task,
@@ -305,22 +313,20 @@ class IngestFileWorker(SandcrawlerWorker):
error_message="ingest worker internal timeout",
)
- def want(self, request):
+ def want(self, request: dict) -> bool:
if not request.get('ingest_type') in ('file', 'pdf'):
return False
return True
- def process(self, request, key=None):
+ def process(self, request: dict, key: Any = None) -> dict:
- # backwards compatibility
- if request.get('ingest_type') in ('file', None):
+ # old backwards compatibility
+ if request.get('ingest_type') == 'file':
request['ingest_type'] = 'pdf'
- # for now, only pdf ingest is implemented
- if not 'ingest_type' in request:
- request['ingest_type'] = "pdf"
- assert request.get('ingest_type') == "pdf"
ingest_type = request.get('ingest_type')
+ if ingest_type not in ("pdf", "xml"):
+ raise NotImplementedError(f"can't handle ingest_type={ingest_type}")
# parse/clean URL
# note that we pass through the original/raw URL, and that is what gets
@@ -339,17 +345,19 @@ class IngestFileWorker(SandcrawlerWorker):
best_mimetype = None
if ingest_type == "pdf":
best_mimetype = "application/pdf"
+ elif ingest_type == "xml":
+ best_mimetype = "text/xml"
+ elif ingest_type == "html":
+ best_mimetype = "text/html"
- existing = self.check_existing_ingest(base_url)
+ existing = self.check_existing_ingest(ingest_type, base_url)
if existing:
return self.process_existing(request, existing)
- result = dict(request=request, hit=False)
+ result: Dict[str, Any] = dict(request=request, hit=False)
next_url = base_url
hops = [base_url]
- self.max_hops = 6
-
while len(hops) <= self.max_hops:
@@ -402,25 +410,9 @@ class IngestFileWorker(SandcrawlerWorker):
result['error_message'] = str(e)[:1600]
return result
- if not resource.hit:
- result['status'] = resource.status
- if resource.terminal_url:
- result['terminal'] = {
- "terminal_url": resource.terminal_url,
- "terminal_dt": resource.terminal_dt,
- "terminal_status_code": resource.terminal_status_code,
- }
- if resource.terminal_url not in result['hops']:
- result['hops'].append(resource.terminal_url)
- return result
+ assert resource
- if not resource.body:
- result['status'] = 'null-body'
- return result
- file_meta = gen_file_metadata(resource.body)
-
- if resource.terminal_url and ('/cookieAbsent' in next_url or 'cookieSet=1' in resource.terminal_url):
- result['status'] = 'blocked-cookie'
+ if resource.terminal_url:
result['terminal'] = {
"terminal_url": resource.terminal_url,
"terminal_dt": resource.terminal_dt,
@@ -428,64 +420,61 @@ class IngestFileWorker(SandcrawlerWorker):
}
if resource.terminal_url not in result['hops']:
result['hops'].append(resource.terminal_url)
+
+ if not resource.hit:
+ result['status'] = resource.status
return result
- # crude handling of content-encoding; wayback fetch library usually
- # (and should always?) handle this
- if file_meta['mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip':
- print("transfer encoding not stripped: {}".format(resource.cdx.mimetype), file=sys.stderr)
- try:
- inner_body = gzip.decompress(resource.body)
- except Exception as e:
- result['status'] = 'bad-gzip-encoding'
- result['error_message'] = str(e)
- return result
- if not inner_body:
- result['status'] = 'null-body'
- return result
- resource = ResourceResult(
- body=inner_body,
- # copy all other fields
- start_url=resource.start_url,
- hit=resource.hit,
- status=resource.status,
- terminal_url=resource.terminal_url,
- terminal_dt=resource.terminal_dt,
- terminal_status_code=resource.terminal_status_code,
- cdx=resource.cdx,
- revisit_cdx=resource.revisit_cdx,
- )
- file_meta = gen_file_metadata(resource.body)
-
- if "html" in file_meta['mimetype'] or "xhtml" in file_meta['mimetype'] or "application/xml" in file_meta['mimetype'] or "text/xml" in file_meta['mimetype']:
- # Got landing page or similar. Some XHTML detected as "application/xml"
- if resource.terminal_dt:
- result['terminal'] = {
- "terminal_url": resource.terminal_url,
- "terminal_dt": resource.terminal_dt,
- "terminal_status_code": resource.terminal_status_code,
- }
- fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body)
-
- result['html'] = fulltext_url
- if not fulltext_url:
- result['status'] = 'no-pdf-link'
- return result
- next_url = fulltext_url.get('pdf_url') or fulltext_url.get('next_url')
- assert next_url
- next_url = clean_url(next_url)
- print("[PARSE\t] {}\t{}".format(
- fulltext_url.get('technique'),
- next_url,
- ),
- file=sys.stderr)
- if next_url in hops:
- result['status'] = 'link-loop'
- result['error_message'] = "repeated: {}".format(next_url)
- return result
- hops.append(next_url)
- continue
-
+ if resource.terminal_url and ('/cookieAbsent' in next_url or 'cookieSet=1' in resource.terminal_url):
+ result['status'] = 'blocked-cookie'
+ return result
+
+ file_meta = gen_file_metadata(resource.body)
+ try:
+ file_meta, resource = fix_transfer_encoding(file_meta, resource)
+ except Exception as e:
+ result['status'] = 'bad-gzip-encoding'
+ result['error_message'] = str(e)
+ return result
+
+ if not resource.body or file_meta['size_bytes'] == 0:
+ result['status'] = 'null-body'
+ return result
+
+ # here is where we split based on ingest type
+ html_ish_resource = bool(
+ "html" in file_meta['mimetype']
+ or "xhtml" in file_meta['mimetype']
+ or "application/xml" in file_meta['mimetype']
+ or "text/xml" in file_meta['mimetype']
+ )
+ if ingest_type == "pdf":
+ if html_ish_resource:
+ # Got landing page or similar. Some XHTML detected as "application/xml"
+ fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body)
+
+ result['html'] = fulltext_url
+ if not fulltext_url:
+ result['status'] = 'no-pdf-link'
+ return result
+ next_url = fulltext_url.get('pdf_url') or fulltext_url.get('next_url')
+ assert next_url
+ next_url = clean_url(next_url)
+ print("[PARSE {:>6}] {} {}".format(
+ ingest_type,
+ fulltext_url.get('technique'),
+ next_url,
+ ),
+ file=sys.stderr)
+ if next_url in hops:
+ result['status'] = 'link-loop'
+ result['error_message'] = "repeated: {}".format(next_url)
+ return result
+ hops.append(next_url)
+ continue
+ else:
+ raise NotImplementedError()
+
# default is to NOT keep hopping
break
@@ -493,6 +482,11 @@ class IngestFileWorker(SandcrawlerWorker):
result['status'] = "max-hops-exceeded"
return result
+ # fetch must be a hit if we got this far (though not necessarily an ingest hit!)
+ assert resource
+ assert resource.hit == True
+ assert resource.terminal_status_code in (200, 226)
+
if resource.terminal_url:
result['terminal'] = {
"terminal_url": resource.terminal_url,
@@ -501,35 +495,37 @@ class IngestFileWorker(SandcrawlerWorker):
"terminal_sha1hex": file_meta['sha1hex'],
}
- # fetch must be a hit if we got this far (though not necessarily an ingest hit!)
- assert resource.hit == True
- assert resource.terminal_status_code in (200, 226)
-
result['file_meta'] = file_meta
result['cdx'] = cdx_to_dict(resource.cdx)
if resource.revisit_cdx:
result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx)
- # other failure cases
- if not resource.body or file_meta['size_bytes'] == 0:
- result['status'] = 'null-body'
- return result
-
- if not (resource.hit and file_meta['mimetype'] == "application/pdf"):
- result['status'] = "wrong-mimetype" # formerly: "other-mimetype"
- return result
+ if ingest_type == "pdf":
+ if not file_meta['mimetype'] == "application/pdf":
+ result['status'] = "wrong-mimetype" # formerly: "other-mimetype"
+ return result
+ else:
+ raise NotImplementedError()
- info = self.process_hit(resource, file_meta)
+ info = self.process_hit(ingest_type, resource, file_meta)
result.update(info)
result['status'] = "success"
result['hit'] = True
- print("[SUCCESS\t] sha1:{} grobid:{} pdfextract:{}".format(
- result.get('file_meta', {}).get('sha1hex'),
- result.get('grobid', {}).get('status_code'),
- result.get('pdf_meta', {}).get('status'),
- ),
- file=sys.stderr)
+ if ingest_type == "pdf":
+ print("[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format(
+ ingest_type,
+ result.get('file_meta', {}).get('sha1hex'),
+ result.get('grobid', {}).get('status_code'),
+ result.get('pdf_meta', {}).get('status'),
+ ),
+ file=sys.stderr)
+ else:
+ print("[SUCCESS {:>5}] sha1:{}".format(
+ ingest_type,
+ result.get('file_meta', {}).get('sha1hex'),
+ ),
+ file=sys.stderr)
return result