diff options
Diffstat (limited to 'python/sandcrawler/ingest.py')
-rw-r--r-- | python/sandcrawler/ingest.py | 446 |
1 files changed, 0 insertions, 446 deletions
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py deleted file mode 100644 index f4e78e4..0000000 --- a/python/sandcrawler/ingest.py +++ /dev/null @@ -1,446 +0,0 @@ - -import sys -import json -import gzip -import base64 -import requests -from http.server import BaseHTTPRequestHandler, HTTPServer -from collections import namedtuple - -from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult -from sandcrawler.grobid import GrobidClient -from sandcrawler.misc import gen_file_metadata, clean_url -from sandcrawler.html import extract_fulltext_url -from sandcrawler.workers import SandcrawlerWorker -from sandcrawler.db import SandcrawlerPostgrestClient - - -class IngestFileWorker(SandcrawlerWorker): - """ - High level flow is to look in history first, then go to live web if - resource not found. Following redirects is treated as "fetching a - resource". Current version fetches a single resource; if it isn't a hit - but is an HTML 200, treats it as a landing page, tries to extract - fulltext link, then fetches that resource. - - process(request, key=None) -> response - Does all the things! - - Check existing processing (short circuit): - - check_existing_ingest(base_url) -> ingest_file_result or none - process_existing(result) -> response - try fetching all the rows we want. if any don't exist, fetch the resource itself and call process_hit() - - Fetch resource: - - find_resource(url) -> ResourceResult - - Process resource: - - process_hit(ResourceResult) -> response - process_grobid(ResourceResult) - """ - - def __init__(self, sink=None, **kwargs): - super().__init__() - - self.sink = sink - self.wayback_client = kwargs.get('wayback_client') - if not self.wayback_client: - self.wayback_client = WaybackClient() - self.spn_client = kwargs.get('spn_client') - if not self.spn_client: - self.spn_client = SavePageNowClient() - self.grobid_client = kwargs.get('grobid_client') - if not self.grobid_client: - self.grobid_client = GrobidClient() - self.pgrest_client = kwargs.get('pgrest_client') - if not self.pgrest_client: - self.pgrest_client = SandcrawlerPostgrestClient() - self.grobid_sink = kwargs.get('grobid_sink') - - self.try_existing_ingest = kwargs.get('try_existing_ingest', False) - self.try_existing_grobid = kwargs.get('try_existing_grobid', True) - self.try_wayback = kwargs.get('try_wayback', True) - self.try_spn2 = kwargs.get('try_spn2', True) - - self.base_url_blocklist = [ - # temporary, until we implement specific fetch and 'petabox' output - "://archive.org/", - "://web.archive.org/web/", - "://openlibrary.org/", - "://fatcat.wiki/", - - # Domain squats - "://bartandjones.com", - "://ijretm.com", - "://ijrcemas.com", - "://jist.net.in", - "://croisements-revue.org", - - # all stubs/previews, not full papers - "://page-one.live.cf.public.springer.com", - - # large datasets-only (no PDF expected) - "plutof.ut.ee/", - "www.gbif.org/", - "doi.pangaea.de/", - "www.plate-archive.org/", - "://doi.org/10.25642/ipk/gbis/", - "://apex.ipk-gatersleben.de/", - - # Historical non-paper content: - "dhz.uni-passau.de/", # newspapers - "digital.ucd.ie/", # ireland national historical - ] - - # these are special-case web domains for which we want SPN2 to not run - # a headless browser (brozzler), but instead simply run wget. - # the motivation could be to work around browser issues, or in the - # future possibly to increase download efficiency (wget/fetch being - # faster than browser fetch) - self.spn2_simple_get_domains = [ - ] - - - def check_existing_ingest(self, base_url): - """ - Check in sandcrawler-db (postgres) to see if we have already ingested - this URL (ingest file result table). - - Returns existing row *if* found *and* we should use it, otherwise None. - - Looks at existing ingest results and makes a decision based on, eg, - status and timestamp. - """ - if not self.try_existing_ingest: - return None - existing = self.pgrest_client.get_ingest_file_result(base_url) - # TODO: filter on more flags? - if existing and existing['hit'] == True: - return existing - else: - return None - - def find_resource(self, url, best_mimetype=None, force_recrawl=False): - """ - Looks in wayback for a resource starting at the URL, following any - redirects. If a hit isn't found, try crawling with SPN. - """ - via = "none" - resource = None - - if url.startswith("http://web.archive.org/web/") or url.startswith("https://web.archive.org/web/"): - raise NotImplementedError("handling direct wayback links not supported yet") - - if url.startswith("http://archive.org/") or url.startswith("https://archive.org/"): - raise NotImplementedError("fetching from archive.org not implemented yet") - - if self.try_wayback and not force_recrawl: - via = "wayback" - resource = self.wayback_client.lookup_resource(url, best_mimetype) - - # check for "soft 404" conditions, where we should retry with live SPNv2 - # TODO: could refactor these into the resource fetch things themselves? - soft404 = False - if resource and resource.hit and resource.terminal_url.endswith('/cookieAbsent'): - soft404 = True - - if self.try_spn2 and (not resource or not resource.hit or soft404): - via = "spn2" - force_get = 0 - for domain in self.spn2_simple_get_domains: - if domain in url: - force_get = 1 - break - resource = self.spn_client.crawl_resource(url, self.wayback_client, force_get=force_get) - print("[FETCH {}\t] {}\t{}".format( - via, - resource.status, - resource.terminal_url or url), - file=sys.stderr) - return resource - - def process_existing(self, request, result_row): - """ - If we have an existing ingest file result, do any database fetches or - additional processing necessary to return a result. - """ - raise NotImplementedError("process_existing() not tested or safe yet") - assert result_row['hit'] - existing_file_meta = self.pgrest_client.get_grobid(result_row['terminal_sha1hex']) - existing_grobid = self.pgrest_client.get_grobid(result_row['terminal_sha1hex']) - existing_cdx = self.pgrest_client.get_cdx(result_row['terminal_url'], result_row['terminal_dt']) - if not (existing_file_meta and existing_grobid and existing_cdx): - raise NotImplementedError("partially-exsiting records not implemented yet") - result = { - 'hit': result_row['hit'], - 'status': "existing", - 'request': request, - 'grobid': existing_grobid, - 'file_meta': existing_file_meta, - 'cdx': existing_cdx, - 'terminal': { - 'terminal_url': result_row['terminal_url'], - 'terminal_dt': result_row['terminal_dt'], - 'terminal_status_code': result_row['terminal_status_code'], - 'terminal_sha1hex': result_row['terminal_sha1hex'], - }, - } - return result - - def process_hit(self, resource, file_meta): - """ - Run all the necessary processing for a new/fresh ingest hit. - """ - return { - 'grobid': self.process_grobid(resource, file_meta), - } - - def process_grobid(self, resource, file_meta): - """ - Submits to resource body to GROBID for processing. - - TODO: By default checks sandcrawler-db for an existing row first, then - decide if we should re-process - """ - if self.try_existing_grobid: - existing = self.pgrest_client.get_grobid(file_meta['sha1hex']) - if existing: - print("found existing GROBID result", file=sys.stderr) - return existing - - # Need to actually processes - result = self.grobid_client.process_fulltext(resource.body) - if self.grobid_sink: - # extra fields for GROBID kafka messages - result['file_meta'] = file_meta - result['key'] = result['file_meta']['sha1hex'] - self.grobid_sink.push_record(result.copy()) - if result['status'] == "success": - metadata = self.grobid_client.metadata(result) - if metadata: - result['metadata'] = self.grobid_client.metadata(result) - result['fatcat_release'] = result['metadata'].pop('fatcat_release', None) - result['grobid_version'] = result['metadata'].pop('grobid_version', None) - result.pop('tei_xml', None) - result.pop('file_meta', None) - result.pop('key', None) - return result - - def timeout_response(self, task): - print("[TIMEOUT]", file=sys.stderr) - return dict( - request=task, - hit=False, - status="timeout", - error_message="ingest worker internal timeout", - ) - - def want(self, request): - if not request.get('ingest_type') in ('file', 'pdf'): - return False - return True - - def process(self, request, key=None): - - # backwards compatibility - if request.get('ingest_type') in ('file', None): - request['ingest_type'] = 'pdf' - - # for now, only pdf ingest is implemented - if not 'ingest_type' in request: - request['ingest_type'] = "pdf" - assert request.get('ingest_type') == "pdf" - ingest_type = request.get('ingest_type') - - # parse/clean URL - # note that we pass through the original/raw URL, and that is what gets - # persisted in database table - base_url = clean_url(request['base_url']) - - force_recrawl = bool(request.get('force_recrawl', False)) - - for block in self.base_url_blocklist: - if block in base_url: - print("[SKIP {}\t] {}".format(ingest_type, base_url), file=sys.stderr) - return dict(request=request, hit=False, status="skip-url-blocklist") - - print("[INGEST {}\t] {}".format(ingest_type, base_url), file=sys.stderr) - - best_mimetype = None - if ingest_type == "pdf": - best_mimetype = "application/pdf" - - existing = self.check_existing_ingest(base_url) - if existing: - return self.process_existing(request, existing) - - result = dict(request=request, hit=False) - - next_url = base_url - hops = [base_url] - self.max_hops = 6 - - - while len(hops) <= self.max_hops: - - result['hops'] = hops - try: - resource = self.find_resource(next_url, best_mimetype, force_recrawl=force_recrawl) - except SavePageNowError as e: - result['status'] = 'spn2-error' - result['error_message'] = str(e)[:1600] - return result - except PetaboxError as e: - result['status'] = 'petabox-error' - result['error_message'] = str(e)[:1600] - return result - except CdxApiError as e: - result['status'] = 'cdx-error' - result['error_message'] = str(e)[:1600] - return result - except WaybackError as e: - result['status'] = 'wayback-error' - result['error_message'] = str(e)[:1600] - return result - except NotImplementedError as e: - result['status'] = 'not-implemented' - result['error_message'] = str(e)[:1600] - return result - - if not resource.hit: - result['status'] = resource.status - if resource.terminal_dt and resource.terminal_status_code: - result['terminal'] = { - "terminal_url": resource.terminal_url, - "terminal_dt": resource.terminal_dt, - "terminal_status_code": resource.terminal_status_code, - } - if resource.terminal_url not in result['hops']: - result['hops'].append(resource.terminal_url) - return result - - if not resource.body: - result['status'] = 'null-body' - return result - file_meta = gen_file_metadata(resource.body) - - # crude handling of content-encoding; wayback fetch library usually - # (and should always?) handle this - if file_meta['mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip': - print("transfer encoding not stripped: {}".format(resource.cdx.mimetype), file=sys.stderr) - try: - inner_body = gzip.decompress(resource.body) - except Exception as e: - result['status'] = 'bad-gzip-encoding' - result['error_message'] = str(e) - return result - if not inner_body: - result['status'] = 'null-body' - return result - resource = ResourceResult( - body=inner_body, - # copy all other fields - start_url=resource.start_url, - hit=resource.hit, - status=resource.status, - terminal_url=resource.terminal_url, - terminal_dt=resource.terminal_dt, - terminal_status_code=resource.terminal_status_code, - cdx=resource.cdx, - revisit_cdx=resource.revisit_cdx, - ) - file_meta = gen_file_metadata(resource.body) - - if "html" in file_meta['mimetype'] or "xhtml" in file_meta['mimetype'] or "application/xml" in file_meta['mimetype']: - # Got landing page or similar. Some XHTML detected as "application/xml" - if resource.terminal_dt: - result['terminal'] = { - "terminal_url": resource.terminal_url, - "terminal_dt": resource.terminal_dt, - "terminal_status_code": resource.terminal_status_code, - } - fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body) - - result['html'] = fulltext_url - if not fulltext_url: - result['status'] = 'no-pdf-link' - return result - next_url = fulltext_url.get('pdf_url') or fulltext_url.get('next_url') - assert next_url - next_url = clean_url(next_url) - print("[PARSE\t] {}\t{}".format( - fulltext_url.get('technique'), - next_url, - ), - file=sys.stderr) - if next_url in hops: - result['status'] = 'link-loop' - result['error_message'] = "repeated: {}".format(next_url) - return result - hops.append(next_url) - continue - - # default is to NOT keep hopping - break - - if len(hops) >= self.max_hops: - result['status'] = "max-hops-exceeded" - return result - - if resource.terminal_dt: - result['terminal'] = { - "terminal_url": resource.terminal_url, - "terminal_dt": resource.terminal_dt, - "terminal_status_code": resource.terminal_status_code, - "terminal_sha1hex": file_meta['sha1hex'], - } - - # fetch must be a hit if we got this far (though not necessarily an ingest hit!) - assert resource.hit == True - assert resource.terminal_status_code in (200, 226) - - result['file_meta'] = file_meta - result['cdx'] = cdx_to_dict(resource.cdx) - if resource.revisit_cdx: - result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx) - - # other failure cases - if not resource.body or file_meta['size_bytes'] == 0: - result['status'] = 'null-body' - return result - - if not (resource.hit and file_meta['mimetype'] == "application/pdf"): - result['status'] = "wrong-mimetype" # formerly: "other-mimetype" - return result - - info = self.process_hit(resource, file_meta) - result.update(info) - - result['status'] = "success" - result['hit'] = True - print("[SUCCESS\t] sha1:{} grobid:{}".format( - result.get('file_meta', {}).get('sha1hex'), - result.get('grobid', {}).get('status_code'), - ), - file=sys.stderr) - return result - - -class IngestFileRequestHandler(BaseHTTPRequestHandler): - def do_POST(self): - if self.path != "/ingest": - self.send_response(404) - self.end_headers() - self.wfile.write("404: Not Found") - return - length = int(self.headers.get('content-length')) - request = json.loads(self.rfile.read(length).decode('utf-8')) - print("Got request: {}".format(request)) - ingester = IngestFileWorker() - result = ingester.process(request) - self.send_response(200) - self.end_headers() - self.wfile.write(json.dumps(result)) |