diff options
Diffstat (limited to 'python/sandcrawler')
-rw-r--r-- | python/sandcrawler/__init__.py | 10 | ||||
-rw-r--r-- | python/sandcrawler/db.py | 418 | ||||
-rw-r--r-- | python/sandcrawler/grobid.py | 130 | ||||
-rw-r--r-- | python/sandcrawler/html.py | 348 | ||||
-rw-r--r-- | python/sandcrawler/html_ingest.py | 441 | ||||
-rw-r--r-- | python/sandcrawler/html_metadata.py | 857 | ||||
-rw-r--r-- | python/sandcrawler/ia.py | 1138 | ||||
-rw-r--r-- | python/sandcrawler/ingest.py | 833 | ||||
-rw-r--r-- | python/sandcrawler/minio.py | 99 | ||||
-rw-r--r-- | python/sandcrawler/misc.py | 222 | ||||
-rw-r--r-- | python/sandcrawler/pdfextract.py | 470 | ||||
-rw-r--r-- | python/sandcrawler/pdftrio.py | 130 | ||||
-rw-r--r-- | python/sandcrawler/persist.py | 584 | ||||
-rw-r--r-- | python/sandcrawler/workers.py | 625 | ||||
-rw-r--r-- | python/sandcrawler/xml.py | 7 |
15 files changed, 6312 insertions, 0 deletions
diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py new file mode 100644 index 0000000..e461462 --- /dev/null +++ b/python/sandcrawler/__init__.py @@ -0,0 +1,10 @@ + +from .grobid import GrobidClient, GrobidWorker, GrobidBlobWorker +from .pdftrio import PdfTrioClient, PdfTrioWorker, PdfTrioBlobWorker +from .misc import gen_file_metadata, b32_hex, parse_cdx_line, parse_cdx_datetime, clean_url +from .workers import KafkaSink, KafkaCompressSink, JsonLinePusher, CdxLinePusher, CdxLinePusher, KafkaJsonPusher, BlackholeSink, ZipfilePusher, MultiprocessWrapper +from .ia import WaybackClient, WaybackError, WaybackContentError, CdxApiClient, CdxApiError, SavePageNowClient, SavePageNowError, PetaboxError, ResourceResult, WarcResource, CdxPartial, CdxRow +from .ingest import IngestFileWorker +from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker, PersistPdfTrioWorker, PersistIngestRequestWorker, PersistPdfTextWorker, PersistThumbnailWorker +from .db import SandcrawlerPostgrestClient, SandcrawlerPostgresClient +from .pdfextract import PdfExtractWorker, PdfExtractBlobWorker diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py new file mode 100644 index 0000000..e60b310 --- /dev/null +++ b/python/sandcrawler/db.py @@ -0,0 +1,418 @@ + +import json +import datetime +from typing import Optional + +import psycopg2 +import psycopg2.extras +import requests + +class SandcrawlerPostgrestClient: + + def __init__(self, api_url="http://wbgrp-svc506.us.archive.org:3030", **kwargs): + self.api_url = api_url + + def get_cdx(self, url): + resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.'+url)) + resp.raise_for_status() + return resp.json() or None + + def get_grobid(self, sha1): + resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.'+sha1)) + resp.raise_for_status() + resp = resp.json() + if resp: + return resp[0] + else: + return None + + def get_pdftrio(self, sha1): + resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.'+sha1)) + resp.raise_for_status() + resp = resp.json() + if resp: + return resp[0] + else: + return None + + def get_pdf_meta(self, sha1): + resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.'+sha1)) + resp.raise_for_status() + resp = resp.json() + if resp: + return resp[0] + else: + return None + + def get_html_meta(self, sha1hex: str) -> Optional[dict]: + resp = requests.get( + self.api_url + "/html_meta", + params=dict(sha1hex=f"eq.{sha1hex}"), + ) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + + def get_file_meta(self, sha1): + resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1)) + resp.raise_for_status() + resp = resp.json() + if resp: + return resp[0] + else: + return None + + def get_ingest_file_result(self, ingest_type: str, url: str) -> Optional[dict]: + resp = requests.get( + self.api_url + "/ingest_file_result", + params=dict(ingest_type=f"eq.{ingest_type}", base_url=f"eq.{url}"), + ) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + + def get_crossref(self, doi): + resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.'+doi)) + resp.raise_for_status() + resp = resp.json() + if resp: + return resp[0] + else: + return None + +class SandcrawlerPostgresClient: + + def __init__(self, db_url, **kwargs): + self.conn = psycopg2.connect(db_url) + + def cursor(self): + return self.conn.cursor() + + def commit(self): + return self.conn.commit() + + def _inserts_and_updates(self, resp, on_conflict): + resp = [int(r[0]) for r in resp] + inserts = len([r for r in resp if r == 0]) + if on_conflict == "update": + updates = len([r for r in resp if r != 0]) + else: + updates = 0 + return (inserts, updates) + + def insert_cdx(self, cur, batch, on_conflict="nothing"): + sql = """ + INSERT INTO + cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) + VALUES %s + ON CONFLICT ON CONSTRAINT cdx_pkey DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + + batch = [d for d in batch if d.get('warc_path')] + if not batch: + return (0, 0) + batch = [(d['url'], + d['datetime'], + d['sha1hex'], + d['mimetype'], + d['warc_path'], + int(d['warc_csize']), + int(d['warc_offset'])) + for d in batch] + # filter out duplicate rows by key (url, datetime) + batch_dict = dict() + for b in batch: + batch_dict[(b[0], b[1])] = b + batch = list(batch_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_file_meta(self, cur, batch, on_conflict="nothing"): + sql = """ + INSERT INTO + file_meta(sha1hex, sha256hex, md5hex, size_bytes, mimetype) + VALUES %s + ON CONFLICT (sha1hex) DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + sha256hex=EXCLUDED.sha256hex, + md5hex=EXCLUDED.md5hex, + size_bytes=EXCLUDED.size_bytes, + mimetype=EXCLUDED.mimetype + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + batch = [(d['sha1hex'], + d['sha256hex'], + d['md5hex'], + int(d['size_bytes']), + d['mimetype']) + for d in batch] + # filter out duplicate rows by key (sha1hex) + batch_dict = dict() + for b in batch: + batch_dict[b[0]] = b + batch = list(batch_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_grobid(self, cur, batch, on_conflict="nothing"): + sql = """ + INSERT INTO + grobid (sha1hex, grobid_version, status_code, status, fatcat_release, updated, metadata) + VALUES %s + ON CONFLICT (sha1hex) DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + grobid_version=EXCLUDED.grobid_version, + status_code=EXCLUDED.status_code, + status=EXCLUDED.status, + fatcat_release=EXCLUDED.fatcat_release, + updated=EXCLUDED.updated, + metadata=EXCLUDED.metadata + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + for r in batch: + if r.get('metadata'): + # sometimes these are only in metadata; shouldn't pass through + # though (to save database space) + dupe_fields = ('fatcat_release', 'grobid_version') + for k in dupe_fields: + if not k in r: + r[k] = r['metadata'].get(k) + r['metadata'].pop(k, None) + r['metadata'] = json.dumps(r['metadata'], sort_keys=True) + batch = [(d['key'], + d.get('grobid_version') or None, + d['status_code'], + d['status'], + d.get('fatcat_release') or None, + d.get('updated') or datetime.datetime.now(), + d.get('metadata') or None , + ) + for d in batch] + # filter out duplicate rows by key (sha1hex) + batch_dict = dict() + for b in batch: + batch_dict[b[0]] = b + batch = list(batch_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_pdf_meta(self, cur, batch, on_conflict="nothing"): + """ + batch elements are expected to have .to_sql_tuple() method + """ + sql = """ + INSERT INTO + pdf_meta (sha1hex, updated, status, has_page0_thumbnail, page_count, word_count, page0_height, page0_width, permanent_id, pdf_created, pdf_version, metadata) + VALUES %s + ON CONFLICT (sha1hex) DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + updated=EXCLUDED.updated, + status=EXCLUDED.status, + has_page0_thumbnail=EXCLUDED.has_page0_thumbnail, + page_count=EXCLUDED.page_count, + word_count=EXCLUDED.word_count, + page0_height=EXCLUDED.page0_height, + page0_width=EXCLUDED.page0_width, + permanent_id=EXCLUDED.permanent_id, + pdf_created=EXCLUDED.pdf_created, + pdf_version=EXCLUDED.pdf_version, + metadata=EXCLUDED.metadata + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + batch = [d.to_sql_tuple() for d in batch] + # filter out duplicate rows by key (sha1hex) + batch_dict = dict() + for b in batch: + batch_dict[b[0]] = b + batch = list(batch_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_html_meta(self, cur, batch, on_conflict="nothing"): + """ + batch elements are expected to have .to_sql_tuple() method + """ + sql = """ + INSERT INTO + html_meta (sha1hex, updated, status, scope, has_teixml, has_thumbnail, word_count, biblio, resources) + VALUES %s + ON CONFLICT (sha1hex) DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + updated=EXCLUDED.updated, + status=EXCLUDED.status, + scope=EXCLUDED.scope, + has_teixml=EXCLUDED.has_teixml, + has_thumbnail=EXCLUDED.has_thumbnail, + word_count=EXCLUDED.word_count, + biblio=EXCLUDED.biblio, + resources=EXCLUDED.resources + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + batch = [d.to_sql_tuple() for d in batch] + # filter out duplicate rows by key (sha1hex) + batch_dict = dict() + for b in batch: + batch_dict[b[0]] = b + batch = list(batch_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_pdftrio(self, cur, batch, on_conflict="nothing"): + sql = """ + INSERT INTO + pdftrio (sha1hex, updated, status_code, status, pdftrio_version, + models_date, ensemble_score, bert_score, linear_score, + image_score) + VALUES %s + ON CONFLICT (sha1hex) DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + updated=EXCLUDED.updated, + status_code=EXCLUDED.status_code, + status=EXCLUDED.status, + pdftrio_version=EXCLUDED.pdftrio_version, + models_date=EXCLUDED.models_date, + ensemble_score=EXCLUDED.ensemble_score, + bert_score=EXCLUDED.bert_score, + linear_score=EXCLUDED.linear_score, + image_score=EXCLUDED.image_score + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + batch = [ + ( + d['key'], + d.get('updated') or datetime.datetime.now(), + d['status_code'], + d['status'], + d.get('versions', {}).get('pdftrio_version') or None, + d.get('versions', {}).get('models_date') or None, + d.get('ensemble_score'), + d.get('bert_score'), + d.get('linear_score'), + d.get('image_score'), + ) + for d in batch] + # filter out duplicate rows by key (sha1hex) + batch_dict = dict() + for b in batch: + batch_dict[b[0]] = b + batch = list(batch_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_ingest_request(self, cur, batch, on_conflict="nothing"): + sql = """ + INSERT INTO + ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request) + VALUES %s + ON CONFLICT ON CONSTRAINT ingest_request_pkey DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + for r in batch: + # in case these fields were already packed into 'request' + extra = r.get('request', {}) + for k in ('ext_ids', 'fatcat_release', 'edit_extra', 'rel'): + if r.get(k): + extra[k] = r[k] + if extra: + r['extra'] = json.dumps(extra, sort_keys=True) + batch = [(d['link_source'], + d['link_source_id'], + d['ingest_type'], + d['base_url'], + d.get('ingest_request_source'), + d.get('release_stage') or None, + d.get('extra') or None, + ) + for d in batch] + # filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url) + batch_dict = dict() + for b in batch: + batch_dict[(b[0], b[1], b[2], b[3])] = b + batch = list(batch_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) + + def insert_ingest_file_result(self, cur, batch, on_conflict="nothing"): + sql = """ + INSERT INTO + ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex) + VALUES %s + ON CONFLICT ON CONSTRAINT ingest_file_result_pkey DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + updated=now(), + hit=EXCLUDED.hit, + status=EXCLUDED.status, + terminal_url=EXCLUDED.terminal_url, + terminal_dt=EXCLUDED.terminal_dt, + terminal_status_code=EXCLUDED.terminal_status_code, + terminal_sha1hex=EXCLUDED.terminal_sha1hex + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + batch = [(d['ingest_type'], + d['base_url'], + bool(d['hit']), + d['status'], + d.get('terminal_url'), + d.get('terminal_dt'), + d.get('terminal_status_code'), + d.get('terminal_sha1hex'), + ) + for d in batch] + # filter out duplicate rows by key (ingest_type, base_url) + batch_dict = dict() + for b in batch: + batch_dict[(b[0], b[1])] = b + batch = list(batch_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py new file mode 100644 index 0000000..b4215dc --- /dev/null +++ b/python/sandcrawler/grobid.py @@ -0,0 +1,130 @@ + +import requests + +from grobid2json import teixml2json +from .workers import SandcrawlerWorker, SandcrawlerFetchWorker +from .misc import gen_file_metadata + +class GrobidClient(object): + + def __init__(self, host_url="http://grobid.qa.fatcat.wiki", **kwargs): + self.host_url = host_url + self.consolidate_mode = int(kwargs.get('consolidate_mode', 0)) + + def process_fulltext(self, blob, consolidate_mode=None): + """ + Returns dict with keys: + - status_code + - status (slug) + - error_msg (if status == 'error') + - tei_xml (if status is 200) + + TODO: persist connection for performance? + """ + assert blob + + if consolidate_mode == None: + consolidate_mode = self.consolidate_mode + + try: + grobid_response = requests.post( + self.host_url + "/api/processFulltextDocument", + files={ + 'input': blob, + 'consolidateHeader': self.consolidate_mode, + 'consolidateCitations': 0, # too expensive for now + 'includeRawCitations': 1, + }, + timeout=180.0, + ) + except requests.Timeout: + return { + 'status': 'error-timeout', + 'status_code': -4, # heritrix3 "HTTP timeout" code + 'error_msg': 'GROBID request (HTTP POST) timeout', + } + + info = dict( + status_code=grobid_response.status_code, + ) + if grobid_response.status_code == 200: + info['status'] = 'success' + info['tei_xml'] = grobid_response.text + if len(info['tei_xml']) > 12000000: + # XML is larger than Kafka message size, and much larger than + # an article in general; bail out + info['status'] = 'error' + info['error_msg'] = "response XML too large: {} bytes".format(len(info['tei_xml'])) + info.pop('tei_xml') + else: + # response.text is .content decoded as utf-8 + info['status'] = 'error' + info['error_msg'] = grobid_response.text[:10000] + return info + + def metadata(self, result): + if result['status'] != 'success': + return None + tei_json = teixml2json(result['tei_xml'], encumbered=False) + meta = dict() + biblio = dict() + for k in ('title', 'authors', 'journal', 'date', 'doi', ): + if tei_json.get(k): + biblio[k] = tei_json[k] + meta['biblio'] = biblio + for k in ('grobid_version', 'grobid_timestamp', 'fatcat_release', 'language_code'): + if tei_json.get(k): + meta[k] = tei_json[k] + return meta + +class GrobidWorker(SandcrawlerFetchWorker): + + def __init__(self, grobid_client, wayback_client=None, sink=None, **kwargs): + super().__init__(wayback_client=wayback_client) + self.grobid_client = grobid_client + self.sink = sink + self.consolidate_mode = 0 + + def timeout_response(self, task): + default_key = task['sha1hex'] + return dict( + status="error-timeout", + error_msg="internal GROBID worker timeout", + source=task, + key=default_key, + ) + + def process(self, record, key=None): + default_key = record['sha1hex'] + + fetch_result = self.fetch_blob(record) + if fetch_result['status'] != 'success': + return fetch_result + blob = fetch_result['blob'] + + result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode) + result['file_meta'] = gen_file_metadata(blob) + result['source'] = record + result['key'] = result['file_meta']['sha1hex'] + return result + +class GrobidBlobWorker(SandcrawlerWorker): + """ + This is sort of like GrobidWorker, except it receives blobs directly, + instead of fetching blobs from some remote store. + """ + + def __init__(self, grobid_client, sink=None, **kwargs): + super().__init__() + self.grobid_client = grobid_client + self.sink = sink + self.consolidate_mode = 0 + + def process(self, blob, key=None): + if not blob: + return None + result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode) + result['file_meta'] = gen_file_metadata(blob) + result['key'] = result['file_meta']['sha1hex'] + return result + diff --git a/python/sandcrawler/html.py b/python/sandcrawler/html.py new file mode 100644 index 0000000..cd0a8e8 --- /dev/null +++ b/python/sandcrawler/html.py @@ -0,0 +1,348 @@ + +import re +import sys +import json +import urllib.parse + +from bs4 import BeautifulSoup + +RESEARCHSQUARE_REGEX = re.compile(r'"url":"(https://assets.researchsquare.com/files/.{1,50}/v\d+/Manuscript.pdf)"') +IEEEXPLORE_REGEX = re.compile(r'"pdfPath":"(/.*?\.pdf)"') +OVID_JOURNAL_URL_REGEX = re.compile(r'journalURL = "(http.*)";') +SCIENCEDIRECT_BOUNCE_URL_REGEX = re.compile(r"window.location = '(http.*)';") + + +def extract_fulltext_url(html_url, html_body): + """ + Takes an HTML document (and URL), assumed to be a landing page, and tries + to find a fulltext PDF url. + + On error, or if fails to extract a URL, returns an empty dict. + """ + + host_prefix = '/'.join(html_url.split('/')[:3]) + try: + soup = BeautifulSoup(html_body, 'html.parser') + except TypeError as te: + print(f"{te} (url={html_url})", file=sys.stderr) + return dict() + except UnboundLocalError as ule: + print(f"{ule} (url={html_url})", file=sys.stderr) + return dict() + + ### General Tricks ### + + # highwire-style meta tag + meta = soup.find('meta', attrs={"name":"citation_pdf_url"}) + if not meta: + meta = soup.find('meta', attrs={"name":"bepress_citation_pdf_url"}) + if not meta: + meta = soup.find('meta', attrs={"name":"wkhealth_pdf_url"}) + if not meta: + # researchgate does this; maybe others also? + meta = soup.find('meta', attrs={"property":"citation_pdf_url"}) + if not meta: + meta = soup.find('meta', attrs={"name":"eprints.document_url"}) + # if tag is only partially populated + if meta and not meta.get('content'): + meta = None + # wiley has a weird almost-blank page we don't want to loop on + if meta and not "://onlinelibrary.wiley.com/doi/pdf/" in html_url: + url = meta['content'].strip() + if '://doi.org/' in url: + print(f"\tdoi.org in citation_pdf_url (loop?): {url}", file=sys.stderr) + elif url.startswith('/'): + if host_prefix+url == html_url: + print(f"\tavoiding citation_pdf_url link-loop", file=sys.stderr) + else: + return dict(pdf_url=host_prefix+url, technique='citation_pdf_url') + elif url.startswith('http'): + if url == html_url: + print(f"\tavoiding citation_pdf_url link-loop", file=sys.stderr) + else: + return dict(pdf_url=url, technique='citation_pdf_url') + else: + print("\tmalformed citation_pdf_url? {}".format(url), file=sys.stderr) + + meta = soup.find('meta', attrs={"name":"generator"}) + meta_generator = None + if meta and meta.get('content'): + meta_generator = meta['content'].strip() + + ### Publisher/Platform Specific ### + + # research square (researchsquare.com) + if 'researchsquare.com/article/' in html_url: + # JSON in body with a field like: + # "url":"https://assets.researchsquare.com/files/4a57970e-b002-4608-b507-b95967649483/v2/Manuscript.pdf" + m = RESEARCHSQUARE_REGEX.search(html_body.decode('utf-8')) + if m: + url = m.group(1) + assert len(url) < 4096 + return dict(release_stage="manuscript", pdf_url=url, technique='publisher') + + # elseiver linking hub + # https://linkinghub.elsevier.com/retrieve/pii/S1569199319308975 + if '://linkinghub.elsevier.com/retrieve/pii/' in html_url: + # <input type="hidden" name="redirectURL" value="http%3A%2F%2Fcysticfibrosisjournal.com%2Fretrieve%2Fpii%2FS1569199319308975" id="redirectURL"/> + redirect = soup.find("input", attrs={"name": "redirectURL"}) + if redirect: + url = redirect['value'].strip() + if 'http' in url: + url = urllib.parse.unquote(url) + # drop any the query parameter + url = url.split('?via')[0] + return dict(next_url=url, technique="elsevier-linkinghub") + + # sciencedirect PDF URL extract + # https://www.sciencedirect.com/science/article/pii/S0169204621000670 + if 'sciencedirect.com/science/article/pii/' in html_url and not html_url.endswith(".pdf"): + json_tag = soup.find("script", attrs={"type": "application/json", "data-iso-key": "_0"}) + url = None + if json_tag: + try: + json_text = json_tag.string + json_meta = json.loads(json_text) + pdf_meta = json_meta['article']['pdfDownload']['urlMetadata'] + # https://www.sciencedirect.com/science/article/pii/S0169204621000670/pdfft?md5=c4a83d06b334b627ded74cf9423bfa56&pid=1-s2.0-S0169204621000670-main.pdf + url = html_url + pdf_meta['pdfExtension'] + "?md5=" + pdf_meta['queryParams']['md5'] + "&pid=" + pdf_meta['queryParams']['pid'] + except (KeyError, TypeError, json.JSONDecodeError): + pass + if url: + return dict(pdf_url=url, technique="sciencedirect-munge-json") + + # sciencedirect PDF bounce page + # https://www.sciencedirect.com/science/article/pii/S2590109519300424/pdfft?md5=854f43a44de186eb58674b8e20631691&pid=1-s2.0-S2590109519300424-main.pdf + if '://www.sciencedirect.com/' in html_url and html_url.endswith(".pdf"): + # window.location = 'https://pdf.sciencedirectassets.com/320270/AIP/1-s2.0-S2590109519300424/main.pdf?X-Amz-Security-Token=[...]&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20200110T210936Z&X-Amz-SignedHeaders=host&X-Amz-Expires=300&X-Amz-Credential=ASIAQ3PHCVTY23CMDBNC%2F20200110%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Signature=[...]&hash=[...]&host=[...]&pii=S2590109519300424&tid=spdf-74468ebd-6be6-43ac-b294-ced86e8eea58&sid=[...]&type=client'; + m = SCIENCEDIRECT_BOUNCE_URL_REGEX.search(html_body.decode('utf-8')) + if m: + url = m.group(1) + assert len(url) < 4000 + return dict(pdf_url=url, technique="sciencedirect-bounce") + + # ieeexplore.ieee.org + # https://ieeexplore.ieee.org/document/8730316 + if '://ieeexplore.ieee.org/document/' in html_url: + # JSON in body with a field like: + # "pdfPath":"/iel7/6287639/8600701/08730316.pdf", + m = IEEEXPLORE_REGEX.search(html_body.decode('utf-8')) + if m: + url = m.group(1) + assert len(url) < 4096 + return dict(release_stage="published", pdf_url=host_prefix+url, technique="ieeexplore") + # https://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=8730313 + if '://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber' in html_url: + # HTML iframe like: + # <iframe src="http://web.archive.org/web/20191026011528if_/https://ieeexplore.ieee.org/ielx7/6287639/8600701/08730313.pdf?tp=&arnumber=8730313&isnumber=8600701&ref=" frameborder="0"></iframe> + iframe = soup.find("iframe") + if iframe and '.pdf' in iframe['src']: + return dict(pdf_url=iframe['src'], technique="iframe") + + # https://insights.ovid.com/crossref?an=00042307-202001000-00013 + # Ovid is some kind of landing page bounce portal tracking run-around. + # Can extract actual journal URL from javascript blob in the HTML + if '://insights.ovid.com/crossref' in html_url: + # var journalURL = "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689"; + m = OVID_JOURNAL_URL_REGEX.search(html_body.decode('utf-8')) + if m: + url = m.group(1) + assert len(url) < 4096 + return dict(next_url=url, technique='ovid') + + # osf.io + # https://osf.io/8phvx/ + # https://osf.io/preprints/socarxiv/8phvx/ + # wow, they ship total javascript crud! going to just guess download URL + # based on URL for now. Maybe content type header would help? + OSF_DOMAINS = [ + '://osf.io/', + '://biohackrxiv.org/', + '://psyarxiv.com/', + '://arabixiv.org/', + '://engrxiv.org/', + '://edarxiv.org//', + '://ecsarxiv.org/', + '://ecoevorxiv.org/', + '://frenxiv.org/', + '://indiarxiv.org/', + '://mindrxiv.org/', + '://mediarxiv.org/', + '://paleorxiv.org/', + '://thesiscommons.org/', + ] + for domain in OSF_DOMAINS: + if domain in html_url and (len(html_url.split('/')) in [4,5] or '/preprints/' in html_url) and '/download' not in html_url: + if not html_url.endswith("/"): + next_url = html_url+"/download" + else: + next_url = html_url+"download" + return dict(next_url=next_url, technique='osf-by-url') + + # wiley + # https://onlinelibrary.wiley.com/doi/pdf/10.1111/1467-923X.12787 + if "://onlinelibrary.wiley.com/doi/pdf/" in html_url: + if b"/doi/pdfdirect/" in html_body: + next_url = html_url.replace('/doi/pdf/', '/doi/pdfdirect/') + return dict(next_url=next_url, technique='wiley-pdfdirect') + + # arxiv abstract pages + if "://arxiv.org/abs/" in html_url: + url = html_url.replace("/abs/", "/pdf/") + return dict(pdf_url=url, technique='arxiv-url') + + # american archivist (OA) + # https://americanarchivist.org/doi/abs/10.17723/aarc.62.2.j475270470145630 + if "://americanarchivist.org/doi/" in html_url and not "/doi/pdf" in html_url: + # use a more aggressive direct guess to avoid rate-limiting... + if "/doi/10." in html_url: + url = html_url.replace("/doi/10.", "/doi/pdf/10.") + return dict(pdf_url=url, technique='archivist-url') + # <a href="/doi/pdf/10.17723/aarc.62.2.j475270470145630" target="_blank"> + hrefs = soup.find_all('a', attrs={"target":"_blank"}) + for href in hrefs: + url = href['href'].strip() + if "/doi/pdf/" in url: + if url.startswith('http'): + return dict(pdf_url=url, technique='publisher-href') + elif url.startswith('/'): + return dict(pdf_url=host_prefix+url, technique='publisher-href') + + # protocols.io + # https://www.protocols.io/view/flow-cytometry-protocol-mgdc3s6 + if "://www.protocols.io/view/" in html_url and not html_url.endswith(".pdf"): + url = html_url + ".pdf" + return dict(pdf_url=url, technique='protocolsio-url') + + # degruyter.com + # https://www.degruyter.com/view/books/9783486594621/9783486594621-009/9783486594621-009.xml + if "://www.degruyter.com/view/" in html_url and html_url.endswith(".xml"): + url = html_url.replace('/view/', '/downloadpdf/').replace('.xml', '.pdf') + return dict(pdf_url=url, technique='degruyter-url') + + # journals.lww.com (Wolters Kluwer) + # https://journals.lww.com/spinejournal/Abstract/publishahead/Making_the_Most_of_Systematic_Reviews_and.94318.aspx + # DISABLED: they seem to redirect our crawler back to a "Fulltext" page and + # we never get the content. + if "://journals.lww.com/" in html_url and False: + # data-pdf-url="https://pdfs.journals.lww.com/spinejournal/9000/00000/Making_the_Most_of_Systematic_Reviews_and.94318.pdf?token=method|ExpireAbsolute;source|Journals;ttl|1582413672903;payload|mY8D3u1TCCsNvP5E421JYK6N6XICDamxByyYpaNzk7FKjTaa1Yz22MivkHZqjGP4kdS2v0J76WGAnHACH69s21Csk0OpQi3YbjEMdSoz2UhVybFqQxA7lKwSUlA502zQZr96TQRwhVlocEp/sJ586aVbcBFlltKNKo+tbuMfL73hiPqJliudqs17cHeLcLbV/CqjlP3IO0jGHlHQtJWcICDdAyGJMnpi6RlbEJaRheGeh5z5uvqz3FLHgPKVXJzdiVgCTnUeUQFYzcJRFhNtc2gv+ECZGji7HUicj1/6h85Y07DBRl1x2MGqlHWXUawD;hash|6cqYBa15ZK407m4VhFfJLw==" + for line in html_body.split(b'\n'): + if b"data-pdf-url=" in line: + line = line.decode('utf-8') + url = line.strip().replace('data-pdf-url=', '').replace('"', '') + if url.startswith('http') and 'pdfs.journals.lww.com' in url: + return dict(pdf_url=url, technique='journals.lww.com-jsvar') + + # www.ahajournals.org + # https://www.ahajournals.org/doi/10.1161/circ.110.19.2977 + if "://www.ahajournals.org/doi/" in html_url and not '/doi/pdf/' in html_url: + # <a href="/doi/pdf/10.1161/circ.110.19.2977?download=true">PDF download</a> + if b'/doi/pdf/10.' in html_body: + url = html_url.replace('/doi/10.', '/doi/pdf/10.') + url = url + "?download=true" + return dict(pdf_url=url, technique='ahajournals-url') + + # ehp.niehs.nih.gov + # https://ehp.niehs.nih.gov/doi/full/10.1289/EHP4709 + # https://ehp.niehs.nih.gov/doi/10.1289/ehp.113-a51 + if "://ehp.niehs.nih.gov/doi/" in html_url: + # <a href="/doi/pdf/10.1289/EHP4709" target="_blank"> + if b'/doi/pdf/10.' in html_body: + url = html_url.replace('/doi/full/10.', '/doi/pdf/10.').replace('/doi/10.', '/doi/pdf/10.') + return dict(pdf_url=url, technique='ehp.niehs.nigh.gov-url') + + # cogentoa.com + # https://www.cogentoa.com/article/10.1080/23311975.2017.1412873 + if "://www.cogentoa.com/article/" in html_url and not ".pdf" in html_url: + # blech, it's a SPA! All JS + # https://www.cogentoa.com/article/10.1080/23311975.2017.1412873.pdf + url = html_url + ".pdf" + return dict(pdf_url=url, technique='cogentoa-url') + + # chemrxiv.org (likely to be other figshare domains also) + # https://chemrxiv.org/articles/Biradical_Formation_by_Deprotonation_in_Thiazole-Derivatives_The_Hidden_Nature_of_Dasatinib/10101419 + if "://chemrxiv.org/articles/" in html_url or '.figshare.org/articles/' in html_url: + # <script id="app-data" type="text/json"> [...] </script> + json_tag = soup.find('script', id="app-data", attrs={"type": "text/json"}) + if json_tag and json_tag.string: + app_data = json.loads(json_tag.string) + # "exportPdfDownloadUrl": "https://s3-eu-west-1.amazonaws.com/itempdf74155353254prod/10101419/Biradical_Formation_by_Deprotonation_in_Thiazole-Derivatives__The_Hidden_Nature_of_Dasatinib_v1.pdf" + url = app_data.get('article', {}).get('exportPdfDownloadUrl') + if url and url.startswith('http'): + return dict(pdf_url=url, technique='figshare-json') + + # CNKI COVID-19 landing pages + # http://en.gzbd.cnki.net/gzbt/detail/detail.aspx?FileName=HBGF202002003&DbName=GZBJ7920&DbCode=GZBJ + if '://en.gzbd.cnki.net/KCMS/detail/detail.aspx' in html_url: + # <a onclick="WriteKrsDownLog()" target="_blank" id="pdfDown" name="pdfDown" href="/gzbt/download.aspx?filename=4Q1ZYpFdKFUZ6FDR1QkRrolayRXV2ZzattyQ3QFa2JXTyZXUSV3QRFkbndzaGV2KyJXWZVEbFdVYnZndD9EOxg1Tj5Eeys2SMFzLZ5kcuFkM3dEbsR2ZjxEaShVdJhFdp90KhlVVzcjVVlXUVNHWBtWS5Rlb5cnc&tablename=GZBJLAST2020&dflag=pdfdown
 "><i></i>PDF Download</a> + href = soup.find('a', attrs={"id":"pdfDown"}) + if href: + url = href['href'].strip().replace('
', '') + if not url.startswith('http'): + url = host_prefix + url + return dict(pdf_url=url, technique='cnki-href') + + # RWTH AACHEN repository + if '://publications.rwth-aachen.de/record/' in html_url: + record_id = html_url.split('/')[-1] + url = f"{html_url}/files/{record_id}.pdf" + if record_id.isdigit() and url.encode('utf-8') in html_body: + return dict(pdf_url=url, technique='rwth-aachen-url') + + # physchemaspects.ru + if '://physchemaspects.ru/' in html_url and soup: + for href in soup.find_all('a'): + if href.text == "download PDF file": + url = href['href'] + if url.startswith('/'): + url = host_prefix + url + return dict(pdf_url=url, technique='physchemaspects-href') + + # OJS 3 (some) + if meta_generator and meta_generator.startswith("Open Journal Systems"): + href = soup.find('a', attrs={"class":"obj_galley_link file"}) + if href and href.text and "pdf" in href.text.lower(): + url = href['href'].strip() + if url.startswith('/'): + url = host_prefix + url + return dict(pdf_url=url, technique='ojs-galley-href') + + # ETH zurich e-periodica + if '://www.e-periodica.ch/digbib/view' in html_url: + url = html_url.replace('digbib/view', 'cntmng').split('#')[0] + if url.encode('utf-8') in html_body: + return dict(pdf_url=url, technique='href-eperiodica') + + # JMIR + # https://mhealth.jmir.org/2020/7/e17891/ + if '.jmir.org/' in html_url and not "/pdf" in html_url and html_url.endswith("/"): + url = html_url + "pdf" + return dict(pdf_url=url, technique='jmir-url') + + ### below here we are doing guesses + + # generic guess: try current URL plus .pdf, if it exists in the HTML body + if not '.pdf' in html_url: + url = html_url + ".pdf" + if url.encode('utf-8') in html_body: + return dict(pdf_url=url, technique='guess-url-plus-pdf') + + return dict() + +def test_regex(): + lines = """ + blah + var journalURL = "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689"; + asdf""" + m = OVID_JOURNAL_URL_REGEX.search(lines) + assert m.group(1) == "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689" + + lines = """ + window.onload = function () { + window.location = 'https://pdf.sciencedirectassets.com/320270/AIP/1-s2.0-S2590109519300424/main.pdf?X-Amz-Security-Token=IQoJb3JpZ2luX2VjEH0aCXVzLWVhc3QtMSJGMEQCICBF0dnrtKfpcs3T1kOjMS9w9gedqiLBrcbp4aKQSP8fAiAT9G426t6FWXHO2zPSXRFLq2eiqgbew2vkNKbcn87teyq9Awj1%2F%2F%2F%2F%2F%2F%2F%2F%2F%2F8BEAIaDDA1OTAwMzU0Njg2NSIMnZcTRhbvMwF%2F5PA5KpEDdN%2FDI4V%2BNMDWQDFeAdUc99Lyxak%2B6vhAsfCBCf8hhvrRpalz75e74%2FXMAQwMN9m6i98o0Ljv9od7cuQEy8t%2B0DLzjzX5n3%2FxmpttowhMUm1jc8tBniLKBjwhTyiSHwhdeaVZf6x2zCJ0EIOWMNJHp3iFEqpaFvkRZbC1KWK4XPNNKo72HCvXuG7xmGrdHByz91AP7UgIYCy4hT10fnM43gbOE4wW8fqpgnvwCId%2F2u8k4rQoCLBqLYZzqshCRm1DBbsXCQhTwDXiMC2Ek3f63yKgw7rRCAxvs0vqirG%2B4mJ6LADaztAFMtKDPfnd4e%2B7%2FvnKU2NeotrqrkRgOkIAoFumbQXf20ky6mKWyHBk%2FxirVp60vUcLQpUm2Pcp6ythYxUi9IJxRGX8EF6aV4UHuCpUDUE7o8N84KUXIedUpytUZx7Xoxfk9w%2BR3%2FgX4LEHfkrWgiFAS3bVxNGOeV7GTwcXdcAggbdCaiAe46dfv7DDedx0KhVKOPH7obfvShqd6TYc0BjrV4sx61594ZJ3%2FO0ws7Lj8AU67AF17%2B1NZ3Ugu%2BwG9Ys9s7OxG8E4kBJ58vEY1yuBOQK9y2we4%2FTGPuqSxCuezqA%2BseslXYP%2FRc%2FZL9xx%2FUYaSjZhk1p1mhojxgBrckJYU7d8c4ELMPmtVy6R1yd2VDUoawEU8SB7nbNnMKzqQ3RgGgqGJiELys6dt%2FIr%2BVhpqM%2FZT4zadvzs8P%2FLoGzUHJKNZt0f99wLvZilphV92E%2BOUnwC4wbg3i3af3zozULwgEr7T%2FX2VsyREgexlzk76qMALPn0lgnciUyyQXxyUWAilXYQ0mQdXefh9lFfycczvt0UEuarX9p1sMwl8Ve5aw%3D%3D&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20200110T210936Z&X-Amz-SignedHeaders=host&X-Amz-Expires=300&X-Amz-Credential=ASIAQ3PHCVTY23CMDBNC%2F20200110%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Signature=b43525576e1a0fdbab581481a3fe6db2862cbb2c69f2860b70cc8d444ccd73d5&hash=ccd128dfe597e704224bdfb4b3358de29b2be5d95887c71076bdab1236ba9e42&host=68042c943591013ac2b2430a89b270f6af2c76d8dfd086a07176afe7c76c2c61&pii=S2590109519300424&tid=spdf-74468ebd-6be6-43ac-b294-ced86e8eea58&sid=f9676d658285a749c46b6d081d965bb12aa8gxrqa&type=client'; + refreshOriginalWindow(); + } + """ + url = "https://pdf.sciencedirectassets.com/320270/AIP/1-s2.0-S2590109519300424/main.pdf?X-Amz-Security-Token=IQoJb3JpZ2luX2VjEH0aCXVzLWVhc3QtMSJGMEQCICBF0dnrtKfpcs3T1kOjMS9w9gedqiLBrcbp4aKQSP8fAiAT9G426t6FWXHO2zPSXRFLq2eiqgbew2vkNKbcn87teyq9Awj1%2F%2F%2F%2F%2F%2F%2F%2F%2F%2F8BEAIaDDA1OTAwMzU0Njg2NSIMnZcTRhbvMwF%2F5PA5KpEDdN%2FDI4V%2BNMDWQDFeAdUc99Lyxak%2B6vhAsfCBCf8hhvrRpalz75e74%2FXMAQwMN9m6i98o0Ljv9od7cuQEy8t%2B0DLzjzX5n3%2FxmpttowhMUm1jc8tBniLKBjwhTyiSHwhdeaVZf6x2zCJ0EIOWMNJHp3iFEqpaFvkRZbC1KWK4XPNNKo72HCvXuG7xmGrdHByz91AP7UgIYCy4hT10fnM43gbOE4wW8fqpgnvwCId%2F2u8k4rQoCLBqLYZzqshCRm1DBbsXCQhTwDXiMC2Ek3f63yKgw7rRCAxvs0vqirG%2B4mJ6LADaztAFMtKDPfnd4e%2B7%2FvnKU2NeotrqrkRgOkIAoFumbQXf20ky6mKWyHBk%2FxirVp60vUcLQpUm2Pcp6ythYxUi9IJxRGX8EF6aV4UHuCpUDUE7o8N84KUXIedUpytUZx7Xoxfk9w%2BR3%2FgX4LEHfkrWgiFAS3bVxNGOeV7GTwcXdcAggbdCaiAe46dfv7DDedx0KhVKOPH7obfvShqd6TYc0BjrV4sx61594ZJ3%2FO0ws7Lj8AU67AF17%2B1NZ3Ugu%2BwG9Ys9s7OxG8E4kBJ58vEY1yuBOQK9y2we4%2FTGPuqSxCuezqA%2BseslXYP%2FRc%2FZL9xx%2FUYaSjZhk1p1mhojxgBrckJYU7d8c4ELMPmtVy6R1yd2VDUoawEU8SB7nbNnMKzqQ3RgGgqGJiELys6dt%2FIr%2BVhpqM%2FZT4zadvzs8P%2FLoGzUHJKNZt0f99wLvZilphV92E%2BOUnwC4wbg3i3af3zozULwgEr7T%2FX2VsyREgexlzk76qMALPn0lgnciUyyQXxyUWAilXYQ0mQdXefh9lFfycczvt0UEuarX9p1sMwl8Ve5aw%3D%3D&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20200110T210936Z&X-Amz-SignedHeaders=host&X-Amz-Expires=300&X-Amz-Credential=ASIAQ3PHCVTY23CMDBNC%2F20200110%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Signature=b43525576e1a0fdbab581481a3fe6db2862cbb2c69f2860b70cc8d444ccd73d5&hash=ccd128dfe597e704224bdfb4b3358de29b2be5d95887c71076bdab1236ba9e42&host=68042c943591013ac2b2430a89b270f6af2c76d8dfd086a07176afe7c76c2c61&pii=S2590109519300424&tid=spdf-74468ebd-6be6-43ac-b294-ced86e8eea58&sid=f9676d658285a749c46b6d081d965bb12aa8gxrqa&type=client" + m = SCIENCEDIRECT_BOUNCE_URL_REGEX.search(lines) + assert m.group(1) == url diff --git a/python/sandcrawler/html_ingest.py b/python/sandcrawler/html_ingest.py new file mode 100644 index 0000000..f11cac4 --- /dev/null +++ b/python/sandcrawler/html_ingest.py @@ -0,0 +1,441 @@ + +import io +import sys +import json +import datetime +import argparse +import xml.etree.ElementTree as ET +from typing import List, Optional, Any, Tuple + +import trafilatura +import pydantic +from selectolax.parser import HTMLParser + +from sandcrawler.ia import WaybackClient, CdxApiClient, ResourceResult, cdx_to_dict, fix_transfer_encoding, NoCaptureError, WaybackContentError +from sandcrawler.misc import gen_file_metadata, parse_cdx_datetime, datetime_to_cdx, clean_url, url_fuzzy_equal +from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, html_extract_biblio, load_adblock_rules + + +TRAFILATURA_AGENT = f"trafilatura/{trafilatura.__version__}" + +def html_extract_body_teixml(doc: bytes) -> dict: + try: + tei_xml = trafilatura.extract(doc, + tei_output=True, + include_comments=False, + include_formatting=True, + ) + except (ValueError, TypeError, Exception) as e: + return dict( + status="trafilatura-parse-error", + error_msg=str(e)[:1000], + ) + if tei_xml: + body_txt = teixml_body_text(tei_xml) + word_count = len(body_txt.split()) + return dict(status="success", agent=TRAFILATURA_AGENT, tei_xml=tei_xml, word_count=word_count) + elif doc.startswith(b'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 2012"http://www.w3.org/TR/html4/loose.dtd">'): + # hack for firstmonday.org + return html_extract_body_teixml(doc[106:]) + else: + return dict(status="empty-xml", agent=TRAFILATURA_AGENT) + +def teixml_body_text(doc_xml: str) -> str: + ns = {"tei": "http://www.tei-c.org/ns/1.0"} + tree = ET.fromstring(doc_xml) + body = tree.find('.//tei:body', ns) + if body: + return " ".join(body.itertext()) + else: + return "" + +class WebResource(pydantic.BaseModel): + surt: str + timestamp: datetime.datetime + url: str + sha1hex: str + mimetype: str + status_code: int + size: Optional[int] + sha256hex: Optional[str] + resource_type: Optional[str] + + class Config: + json_encoders = { + datetime.datetime: lambda dt: dt.isoformat() + } + +class IngestWebResult(pydantic.BaseModel): + status: str + hit: bool + error_message: Optional[str] + cdx: Optional[dict] + terminal: Optional[Any] # TODO + request: Optional[Any] # TODO + file_meta: Optional[dict] + html_biblio: Optional[BiblioMetadata] + scope: Optional[str] + html_body: Optional[dict] + html_resources: Optional[List[WebResource]] + + class Config: + arbitrary_types_allowed = True + json_encoders = { + datetime.datetime: lambda dt: dt.isoformat(), + } + +class HtmlMetaRow(pydantic.BaseModel): + sha1hex: str + status: str + scope: Optional[str] + has_teixml: bool + has_thumbnail: bool + word_count: Optional[int] + biblio: Optional[dict] + resources: Optional[List[dict]] + + class Config: + arbitrary_types_allowed = True + json_encoders = { + datetime.datetime: lambda dt: dt.isoformat(), + } + + def to_sql_tuple(self) -> Tuple: + """ + This is for the html_meta SQL table. + """ + return ( + self.sha1hex, + datetime.datetime.now(), # updated + self.status, + self.scope, + self.has_teixml, + self.has_thumbnail, + self.word_count, + (self.biblio or None) and json.dumps(self.biblio, sort_keys=True), + (self.resources or None) and json.dumps(self.resources, sort_keys=True), + ) + + +def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient, when: Optional[datetime.datetime]) -> List[WebResource]: + """ + This is the lazy version that just does a CDX lookup for each resource. + + Takes a list instead of single record because we may want to circuit break + on failure, and may introduce concurrency internal to this function. + """ + + full = [] + closest = when and datetime_to_cdx(when) + for resource in resources: + cdx_row = cdx_client.lookup_best(resource['url'], closest=closest) + if not cdx_row: + raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}") + if cdx_row.url != resource['url'] and not url_fuzzy_equal(cdx_row.url, resource['url']): + print(f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}", file=sys.stderr) + if not cdx_row.status_code: + # TODO: fall back to a full fetch? + print(f" WARN: skipping revisit record", file=sys.stderr) + continue + full.append(WebResource( + surt=cdx_row.surt, + timestamp=cdx_row.datetime, + url=cdx_row.url, + sha1hex=cdx_row.sha1hex, + mimetype=cdx_row.mimetype, + status_code=cdx_row.status_code, + size=None, + sha256hex=None, + resource_type=resource['type'], + )) + + return full + + +def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient, when: Optional[datetime.datetime]) -> List[WebResource]: + """ + This is the full version which fetches each resource from wayback/petabox + and calculates additional hashes. + + Could make this concurrent in the future, eg: https://realpython.com/python-concurrency/#threading-version + """ + + full = [] + closest = when and datetime_to_cdx(when) + for resource in resources: + wayback_resp = wayback_client.lookup_resource(resource['url'], closest=closest) + if not wayback_resp or wayback_resp.status != 'success': + raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}") + file_meta = gen_file_metadata(wayback_resp.body, allow_empty=True) + if file_meta['sha1hex'] != wayback_resp.cdx.sha1hex: + raise WaybackContentError(f"wayback payload sha1hex mismatch: {wayback_resp.cdx.datetime} {wayback_resp.cdx.url}") + full.append(WebResource( + surt=wayback_resp.cdx.surt, + timestamp=parse_cdx_datetime(wayback_resp.cdx.datetime), + url=wayback_resp.cdx.url, + sha1hex=file_meta['sha1hex'], + mimetype=file_meta['mimetype'], + status_code=wayback_resp.cdx.status_code or wayback_resp.revisit_cdx.status_code, + size=file_meta['size_bytes'], + sha256hex=file_meta['sha256hex'], + resource_type=resource['type'], + )) + + return full + + +def html_guess_platform(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata]) -> Optional[str]: + + generator: Optional[str] = None + generator_elem = doc.css_first("meta[name='generator']") + if generator_elem: + generator = generator_elem.attrs['content'] + else: + generator_elem = doc.css_first("a[id='developedBy']") + if generator_elem: + generator = generator_elem.text() + if generator and "open journal systems 3" in generator.lower(): + return "ojs3" + elif generator and "open journal systems" in generator.lower(): + return "ojs" + elif generator and "plone" in generator.lower(): + return "plone" + elif generator and "wordpress" in generator.lower(): + return "wordpress" + elif generator and "blogger" in generator.lower(): + return "blogger" + elif doc.css_first("body[id='pkp-common-openJournalSystems']"): + return "ojs" + else: + try: + if 'powered by <a target="blank" href="http://pkp.sfu.ca/ojs/">PKP OJS</a>' in doc.html: + return "ojs" + if 'Powered by <a target="_blank" href="http://arphahub.com">' in doc.html: + return "arpha" + if "<meta property='og:image' content='http://cms.galenos.com.tr' />" in doc.html: + return "galenos" + except UnicodeDecodeError: + pass + + icon_elem = doc.css_first("link[type='image/x-icon']") + if icon_elem and 'href' in icon_elem.attrs: + if 'journalssystem.com' in icon_elem.attrs['href']: + return "journalssystem.com" + elif 'indexcopernicus.com' in icon_elem.attrs['href']: + return "indexcopernicus" + + if 'scielo' in url: + return "scielo" + + return None + +def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata], word_count: Optional[int]) -> str: + """ + This function tries to guess if an HTML document represents one of: + + - article-fulltext + - article-abstract + - article-sample + - supplement + - component + - issue-fulltext + - landingpage + - homepage-domain + - blocked-paywall + - blocked-login + - blocked-captcha + - blocked-cookie + - errorpage + - stub + - other + - unknown + + Unknown implies the page could be anything. "other" implies it is not + fulltext or a landing page, but could be one of the other categories. + """ + + # assert that this is a real URL + assert url.count('/') >= 2 + + # basic paywall and loginwall detection based on URL + if url.endswith("/cookieAbsent"): + return "blocked-cookie" + if "://page-one.live.cf.public.springer.com" in url: + return "article-sample" + + if "scielo" in url: + if "sci_abstract" in url: + return "landingpage" + if "sci_arttext" in url: + return "article-fulltext" + + if "showcaptcha.asp" in url: + return "blocked-captcha" + + # is this the top-level URL of the domain? aka, no path? + if url.count('/') <= 2 or (url.count('/') == 3) and url.endswith('/'): + return "homepage-domain" + + platform = html_guess_platform(url, doc, biblio) + + if biblio: + if biblio.html_fulltext_url: + if url_fuzzy_equal(biblio.html_fulltext_url, url): + return "article-fulltext" + else: + return "landingpage" + + # platform-specific detection + if platform in ("ojs", "ojs3"): + + if biblio and biblio.title: + if word_count and word_count > 1200: + return "fulltext" + else: + return "landingpage" + else: + if "/article/view/" in url and word_count and word_count > 600: + return "fulltext" + return "other" + elif platform == "journalssystem.com": + if biblio and biblio.pdf_fulltext_url and word_count and word_count < 1000: + return "landingpage" + + # more platform/publisher specific checks + if "karger.com/Article/Abstract" in url: + return "landingpage" + if "dergipark.gov.tr" in url and not ("download/article-file" in url): + return "other" + + try: + if isinstance(doc.html, str) and "<center><h1>403 Forbidden</h1></center>" in doc.html: + # cloudflare block pattern + return "blocked-forbidden" + except UnicodeDecodeError: + pass + + print(f" scope guessing: platform {platform} word count: {word_count}", file=sys.stderr) + + # fallback: guess based on word count (arbitrary guesses here) + if word_count is not None: + if word_count < 20: + return "stub" + elif word_count > 500 and platform in ['wordpress', 'blogger']: + return "article-fulltext" + elif word_count > 1200: + return "article-fulltext" + + return "unknown" + + +def run_single(url: str, timestamp: Optional[str] = None, quick_mode: bool = False) -> IngestWebResult: + + adblock = load_adblock_rules() + wayback_client = WaybackClient() + + html_resource = wayback_client.lookup_resource(url, "text/html", closest=timestamp) + if html_resource.status != "success": + return IngestWebResult( + status=html_resource.status, + hit=False, + cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), + ) + + assert html_resource.terminal_status_code == 200 + + file_meta = gen_file_metadata(html_resource.body) + file_meta, html_resource = fix_transfer_encoding(file_meta, html_resource) + + if file_meta['mimetype'] not in ("text/html", "text/xml"): + return IngestWebResult( + status="wrong-mimetype", + hit=False, + cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), + file_meta=file_meta, + ) + + html_doc = HTMLParser(html_resource.body) + html_biblio = html_extract_biblio(url, html_doc) + html_body = html_extract_body_teixml(html_resource.body) + html_scope = html_guess_scope(url, html_doc, html_biblio, html_body.get('word_count')) + if html_scope not in ('article-fulltext', 'unknown'): + return IngestWebResult( + status="wrong-scope", + hit=False, + cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), + file_meta=file_meta, + html_biblio=html_biblio, + scope=html_scope, + ) + + raw_resources = html_extract_resources(html_resource.terminal_url, html_doc, adblock) + assert len(raw_resources) <= 200 + + when = parse_cdx_datetime(html_resource.cdx.datetime) + + full_resources: List[WebResource] = [] + if quick_mode: + full_resources = quick_fetch_html_resources(raw_resources, wayback_client.cdx_client, when) + else: + full_resources = fetch_html_resources(raw_resources, wayback_client, when) + + output = IngestWebResult( + status="success", + hit=True, + cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx), + file_meta=file_meta, + html_body=html_body, + html_biblio=html_biblio, + scope=html_scope, + html_resources=full_resources, + ) + return output + + +def main() -> None: + """ + Run this command like: + + python -m sandcrawler.html_ingest + """ + + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + subparsers = parser.add_subparsers() + + sub = subparsers.add_parser( + "single", help="tries to ingest a single URL, dumps result to stdout" + ) + sub.set_defaults(func="run_single") + sub.add_argument( + "url", + help="URL to fetch", + type=str, + ) + sub.add_argument( + "--timestamp", + help="timestamp for which to fetch document from wayback", + type=str, + ) + sub.add_argument( + "--quick-mode", + help="don't fetch resources, only do CDX lookup", + action="store_true", + ) + + args = parser.parse_args() + if not args.__dict__.get("func"): + parser.print_help(file=sys.stderr) + sys.exit(-1) + + if args.func == "run_single": + result = run_single(args.url, args.timestamp, args.quick_mode) + print(result.json(indent=2, exclude_none=True)) + else: + #func = getattr(wp, args.func) + #func() + raise NotImplementedError() + +if __name__ == "__main__": + main() diff --git a/python/sandcrawler/html_metadata.py b/python/sandcrawler/html_metadata.py new file mode 100644 index 0000000..1a328ef --- /dev/null +++ b/python/sandcrawler/html_metadata.py @@ -0,0 +1,857 @@ + +import sys +import datetime +from typing import List, Optional, Any, Tuple, Dict +import urllib.parse + +import dateparser +from selectolax.parser import HTMLParser +import pydantic +import braveblock + +from sandcrawler.misc import url_fuzzy_equal + + +# this is a map of metadata keys to CSS selectors +# sources for this list include: +# - google scholar crawling notes (https://scholar.google.com/intl/ja/scholar/inclusion.html#indexing) +# - inspection of actual publisher HTML +# - http://div.div1.com.au/div-thoughts/div-commentaries/66-div-commentary-metadata +# - "HTML meta tags used by journal articles" +# https://gist.github.com/hubgit/5985963 +# order of these are mostly by preference/quality (best option first), though +# also/sometimes re-ordered for lookup efficiency (lookup stops after first +# match) +HEAD_META_PATTERNS: Any = { + "title": [ + "meta[name='citation_title']", + "meta[name='eprints.title']", + "meta[name='prism.title']", + "meta[name='bepress_citation_title']", + "meta[name='og:title']", + "meta[name='dcterms.title']", + "meta[name='dc.title']", + ], + "subtitle": [ + "meta[name='prism.subtitle']", + ], + "doi": [ + "meta[name='citation_doi']", + "meta[name='DOI']", + "meta[id='DOI']", + "meta[name='prism.doi']", + "meta[name='bepress_citation_doi']", + "meta[name='dc.identifier.doi']", + "meta[name='dc.identifier'][scheme='doi']", + ], + "pmid": [ + "meta[name='citation_pmid']", + ], + "abstract": [ + "meta[name='citation_abstract']", + "meta[name='bepress_citation_abstract']", + "meta[name='eprints.abstract']", + "meta[name='dcterms.abstract']", + "meta[name='prism.teaser']", + "meta[name='dc.description']", + "meta[name='og:description']", + ], + "container_name": [ + "meta[name='citation_journal_title']", + "meta[name='bepress_citation_journal_title']", + "meta[name='citation_conference_title']", + "meta[name='bepress_citation_conference_title']", + "meta[name='prism.publicationName']", + "meta[name='eprints.publication']", + "meta[name='dc.relation.ispartof']", + "meta[name='dc.source']", + "meta[property='og:site_name']", + ], + "container_abbrev": [ + "meta[name='citation_journal_abbrev']", + ], + "raw_date": [ + "meta[name='citation_publication_date']", + "meta[name='bepress_citation_publication_date']", + "meta[name='prism.publicationDate']", + "meta[name='citation_date']", + "meta[name='bepress_citation_date']", + "meta[name='citation_online_date']", + "meta[name='bepress_citation_online_date']", + "meta[itemprop='datePublished']", + "meta[name='article:published']", + "meta[name='eprints.datestamp']", + "meta[name='eprints.date']", + "meta[name='dc.date.created']", + "meta[name='dc.issued']", + "meta[name='dcterms.date']", + "meta[name='dc.date']", + ], + "release_year": [ + "meta[itemprop='citation_year']", + "meta[itemprop='prism:copyrightYear']", + ], + "first_page": [ + "meta[name='citation_firstpage']", + "meta[name='bepress_citation_firstpage']", + "meta[name='prism.startingPage']", + "meta[name='dc.citation.spage']", + ], + "last_page": [ + "meta[name='citation_lastpage']", + "meta[name='bepress_citation_lastpage']", + "meta[name='prism.endingPage']", + "meta[name='dc.citation.epage']", + ], + "issue": [ + "meta[name='citation_issue']", + "meta[name='bepress_citation_issue']", + "meta[name='prism.issueIdentifier']", + "meta[name='dc.citation.issue']", + ], + "volume": [ + "meta[name='citation_volume']", + "meta[name='bepress_citation_volume']", + "meta[name='prism.volume']", + "meta[name='dc.citation.volume']", + ], + "number": [ + "meta[name='citation_technical_report_number']", + "meta[name='bepress_citation_technical_report_number']", + "meta[name='citation_number']", + "meta[name='bepress_citation_number']", + "meta[name='prism.number']", + ], + "container_issn": [ + "meta[name='citation_issn']", + "meta[name='bepress_citation_issn']", + "meta[name='prism.issn']", + "meta[name='prism.eIssn']", + "meta[name='eprints.issn']", + "meta[name='dc.source.issn']", + ], + "isbn": [ + "meta[name='citation_isbn']", + "meta[name='bepress_citation_isbn']", + "meta[name='prism.isbn']", + ], + "publisher": [ + "meta[name='citation_publisher']", + "meta[name='bepress_citation_publisher']", + "meta[name='eprints.publisher']", + "meta[name='citation_technical_report_institution']", + "meta[name='dcterms.publisher']", + "meta[name='dc.publisher']", + ], + "raw_release_type": [ + "meta[name='citation_article_type']", + "meta[name='bepress_citation_article_type']", + "meta[name='prism.contentType']", + "meta[name='eprints.type']", + "meta[name='dc.type']", + ], + "lang": [ + "meta[name='citation_language']", + "meta[name='bepress_citation_language']", + "meta[name='dcterms.language']", + "meta[name='dc.language']", + "meta[name='og:locale']", + ], +} + +HEAD_META_LIST_PATTERNS: Any = { + "contrib_names": [ + "meta[name='citation_author']", + "meta[name='bepress_citation_author']", + "meta[name='eprints.creators_name']", + "meta[name='dcterms.creator']", + "meta[name='article:author']", + "meta[name='dc.creator']", + "meta[name='dc.contributor']", + ], + # TODO: citation_author_institution + "raw_references": [ + "meta[name='citation_reference']", + ], + "raw_identifiers": [ + "meta[name='eprints.id_number']", + "meta[name='dcterms.identifier']", + "meta[name='dc.identifier']", + ], +} + +XML_FULLTEXT_PATTERNS: List[dict] = [ + { + "selector": "meta[name='citation_xml_url']", + "attr": "content", + "technique": "citation_xml_url", + }, + { + "selector": "meta[name='fulltext_xml']", + "attr": "content", + "technique": "fulltext_xml", + }, + { + "selector": "link[rel='alternate'][type='application/xml']", + "attr": "href", + "technique": "alternate link", + }, + { + "selector": "link[rel='alternate'][type='text/xml']", + "attr": "href", + "technique": "alternate link", + }, + { + "in_doc_url": "scielo", + "in_fulltext_url": "articleXML", + "selector": "a[target='xml']", + "attr": "href", + "technique": "SciElo XML link", + }, + { + "in_doc_url": "/article/view/", + "in_fulltext_url": "viewXML", + "selector": "a[class='obj_galley_link']", + "attr": "href", + "technique": "OJS Gallery XML link", + }, + { + "in_fulltext_url": "/download/xml/", + "selector": "a[title='XML']", + "attr": "href", + "technique": "ARPHA XML link", + "example_page": "https://zookeys.pensoft.net/article/26391", + }, + { + "in_doc_url": "frontiersin.org/", + "in_fulltext_url": "xml", + "selector": "a.download-files-nlm", + "attr": "href", + "technique": "XML (NLM) download link (frontiersin.org)", + "example_page": "https://www.frontiersin.org/articles/10.3389/fnins.2021.722592/full", + }, +] + +HTML_FULLTEXT_PATTERNS: List[dict] = [ + { + "selector": "meta[name='citation_fulltext_html_url']", + "attr": "content", + "technique": "citation_fulltext_html_url", + }, + { + "selector": "link[rel='alternate'][type='text/html']", + "attr": "href", + "technique": "alternate link", + }, + { + "in_doc_url": "/article/view/", + "in_fulltext_url": "inline=1", + "selector": "iframe[name='htmlFrame']", + "attr": "src", + "technique": "OJS HTML iframe", + }, + { + "in_doc_url": "dovepress.com", + "in_fulltext_url": "-fulltext-", + "selector": "a[id='view-full-text']", + "attr": "href", + "technique": "dovepress fulltext link", + }, +] + +COMPONENT_FULLTEXT_PATTERNS: List[dict] = [ + { + "in_doc_url": "pensoft.net/article/", # also /element/ + "in_fulltext_url": "/download/fig/", + "selector": ".Main-Content .figure a.P-Article-Preview-Picture-Download-Small", + "attr": "href", + "technique": "Active figure download link (zookeys)", + "example_page": "https://zookeys.pensoft.net/article/38576/element/2/153/", + }, +] + +# This is a database of matching patterns. Most of these discovered by hand, +# looking at OA journal content that failed to craw/ingest. +PDF_FULLTEXT_PATTERNS: List[dict] = [ + { + "selector": "head meta[name='citation_pdf_url']", + "attr": "content", + "technique": "citation_pdf_url", + }, + { + "selector": "head meta[name='bepress_citation_pdf_url']", + "attr": "content", + "technique": "citation_pdf_url", + }, + { + "in_doc_url": "journals.lww.com", + "selector": "head meta[name='wkhealth_pdf_url']", + "attr": "content", + "technique": "wkhealth_pdf_url", + "example_page": "https://journals.lww.com/otainternational/Fulltext/2019/03011/Trauma_systems_in_North_America.2.aspx", + }, + { + "selector": "head meta[propery='citation_pdf_url']", + "attr": "content", + "technique": "citation_pdf_url", + # eg, researchgate + }, + { + "selector": "head meta[name='eprints.document_url']", + "attr": "content", + "technique": "citation_pdf_url (property)", + }, + { + "in_doc_url": "/doi/10.", + "in_fulltext_url": "/doi/pdf/", + "selector": "a.show-pdf", + "attr": "href", + "technique": "SAGE/UTP show-pdflink", + "example_page": "https://journals.sagepub.com/doi/10.1177/2309499019888836", + # also http://utpjournals.press/doi/10.3138/cjh.ach.54.1-2.05 + }, + { + "in_doc_url": "/doi/10.", + "in_fulltext_url": "/doi/pdf/", + "selector": "a[title='PDF']", + "attr": "href", + "technique": "title=PDF link", + "example_page": "https://pubs.acs.org/doi/10.1021/acs.estlett.9b00379", + }, + { + "in_doc_url": "/article/view/", + "selector": "a#pdfDownloadLink", + "attr": "href", + "technique": "pdfDownloadLink link", + "example_page": "http://www.revistas.unam.mx/index.php/rep/article/view/35503/32336", + }, + { + "in_fulltext_url": "/pdf/", + "selector": "a.show-pdf", + "attr": "href", + "technique": "SAGE PDF link", + "example_page": "http://journals.sagepub.com/doi/pdf/10.1177/2309499019888836", + }, + { + "in_doc_url": "://elifesciences.org/articles/", + "in_fulltext_url": "/download/", + "selector": "a[data-download-type='pdf-article']", + "attr": "href", + "technique": "eLife PDF link", + "example_page": "https://elifesciences.org/articles/59841", + }, + { + "in_doc_url": "://www.jcancer.org/", + "in_fulltext_url": ".pdf", + "selector": ".divboxright a.text-button", + "attr": "href", + "technique": "jcancer PDF link", + "example_page": "https://www.jcancer.org/v10p4038.htm", + }, + { + "in_doc_url": "://www.tandfonline.com/doi/full/10.", + "in_fulltext_url": "/pdf/", + "selector": "a.show-pdf", + "attr": "href", + "technique": "t+f show-pdf link", + "example_page": "https://www.tandfonline.com/doi/full/10.1080/19491247.2019.1682234", + }, + { + "in_doc_url": "article_id=", + "in_fulltext_url": "download.php", + "selector": "a.file.pdf", + "attr": "href", + "technique": "pdf file link", + "example_page": "http://journals.tsu.ru/psychology/&journal_page=archive&id=1815&article_id=40405", + }, + { + "in_doc_url": "/content/10.", + "in_fulltext_url": "pdf", + "selector": "a.pdf[title='Download']", + "attr": "href", + "technique": "pdf file link", + "example_page": "https://www.eurosurveillance.org/content/10.2807/1560-7917.ES.2020.25.11.2000230", + }, + { + "selector": "embed[type='application/pdf']", + "attr": "src", + "technique": "PDF embed", + "example_page": "http://www.jasstudies.com/DergiTamDetay.aspx?ID=3401", + }, + { + "in_doc_url": "/html/", + "in_fulltext_url": "create_pdf", + "selector": ".AbsPdfFigTab img[src='images/pdf-icon.jpg'] + a", + "attr": "href", + "technique": "PDF URL link", + "example_page": "http://www.aed.org.cn/nyzyyhjxb/html/2018/4/20180408.htm", + }, + { + "in_doc_url": "/archive-detail/", + "in_fulltext_url": ".pdf", + "selector": ".contact-list a.download-pdf", + "attr": "href", + "technique": "PDF URL link", + "example_page": "http://www.bezmialemscience.org/archives/archive-detail/article-preview/editorial/20439", + }, + { + "in_doc_url": "degruyter.com/document/", + "in_fulltext_url": "/pdf", + "selector": "a.downloadPdf", + "attr": "href", + "technique": "PDF URL link", + "example_page": "https://www.degruyter.com/document/doi/10.1515/zaw-2021-0001/html", + }, + { + "in_doc_url": "repositorio.unicamp.br/handle/", + "in_fulltext_url": "/bitstream/", + "selector": "table.panel-body a[target='_blank']", + "attr": "href", + "technique": "PDF URL link", + "example_page": "http://www.repositorio.unicamp.br/handle/REPOSIP/287750", + }, + { + "in_doc_url": "dlc.library.columbia.edu/durst/", + "selector": "dd.blacklight-lib_non_item_in_context_url_ssm a[href]", + "attr": "href", + "technique": "Access URL link", + "example_page": "https://dlc.library.columbia.edu/durst/cul:18931zcrk9", + }, + { + "in_doc_url": "fldeploc.dep.state.fl.us/geodb_query/fgs_doi", + "in_fulltext_url": "pdf", + "selector": "p a[href]", + "attr": "href", + "technique": "PDF URL link", + "example_page": "http://fldeploc.dep.state.fl.us/geodb_query/fgs_doi.asp?searchCode=IC29", + }, + { + "in_doc_url": "preprints.jmir.org/preprint/", + "selector": "a.pdf-download-button", + "attr": "href", + "technique": "PDF URL link", + "example_page": "https://preprints.jmir.org/preprint/22556", + }, + { + "in_doc_url": "bloomsburycollections.com/", + "in_fulltext_url": "pdf", + "selector": "li.download-item a[href]", + "attr": "href", + "technique": "PDF URL link", + "example_page": "https://www.bloomsburycollections.com/book/the-political-economies-of-media-the-transformation-of-the-global-media-industries/the-political-economies-of-media-and-the-transformation-of-the-global-media-industries", + }, + { + "in_doc_url": "emerald.com/insight/content/", + "in_fulltext_url": "pdf", + "selector": "a.intent_pdf_link", + "attr": "href", + "technique": "PDF URL link", + "example_page": "https://www.emerald.com/insight/content/doi/10.1108/RAMJ-11-2020-0065/full/html", + }, + { + "in_doc_url": "ingentaconnect.com/content/", + "in_fulltext_url": "pdf", + "selector": "a.pdf[data-popup]", + "attr": "data-popup", + "technique": "PDF URL link", + "example_page": "https://www.ingentaconnect.com/content/ista/sst/2021/00000049/00000001/art00007", + }, + { + "in_doc_url": "library.wur.nl/", + "in_fulltext_url": "pdf", + "selector": "a.wl_full_text_restricted", + "attr": "href", + "technique": "PDF URL link", + "example_page": "https://library.wur.nl/WebQuery/wurpubs/529922", + }, + { + "in_doc_url": "/dlibra/", + "in_fulltext_url": "pdf", + "selector": "iframe#js-main-frame", + "attr": "src", + "technique": "PDF iframe (dlibra)", + "example_page": "https://dbc.wroc.pl/dlibra/docmetadata?showContent=true&id=41031", + }, + { + "in_doc_url": "/handle/", + "in_fulltext_url": "pdf", + "selector": "table.misc table.inner tr.b a", + "attr": "href", + "technique": "PDF URL link (DSpace, first file)", + "example_page": "https://orbi.uliege.be/handle/2268/174200", + }, + { + "in_doc_url": "/publications/", + "in_fulltext_url": "pdf", + "selector": ".publication-sidebar li.open-access a.document-link", + "attr": "href", + "technique": "PDF URL link (Pure repo, OA link)", + "example_page": "https://research.tue.nl/en/publications/lowering-the-threshold-for-computers-in-early-design-some-advance", + }, + { + "in_doc_url": "//hal", + "selector": ".widget-openaccess .widget-content a", + "attr": "href", + "technique": "Fulltext OA URL (HAL)", + "example_page": "https://hal.archives-ouvertes.fr/hal-00744951", + }, + { + "in_doc_url": "/record/", + "in_fulltext_url": "pdf", + "selector": "#detailedrecordminipanelfile a", + "attr": "href", + "technique": "PDF URL link (Invenio)", + "example_page": "https://bib-pubdb1.desy.de/record/416556", + }, + { + "in_doc_url": "/available/", + "in_fulltext_url": "pdf", + "selector": "table.file-table a", + "attr": "href", + "technique": "PDF URL link", + "example_page": "https://etd.adm.unipi.it/theses/available/etd-05302014-183910/", + }, + { + "in_doc_url": "/islandora/", + "in_fulltext_url": "pdf", + "selector": "a.islandora-pdf-link", + "attr": "href", + "technique": "PDF URL link (Islandora)", + "example_page": "http://fau.digital.flvc.org/islandora/object/fau%3A9804", + }, + { + "in_doc_url": "/receive/", + "in_fulltext_url": "pdf", + "selector": ".mir-preview noscript a", + "attr": "href", + "technique": "PDF iframe via noscript (MyCoRe)", + "example_page": "https://www.db-thueringen.de/receive/dbt_mods_00005191", + }, + { + "in_doc_url": "/registro.do", + "in_fulltext_url": "imagenes", + "selector": ".resumen_bib a[data-analytics=media]", + "attr": "href", + "technique": "Media link (DIGIBIS)", + "example_page": "https://bivaldi.gva.es/es/consulta/registro.do?id=11740", + }, + { + "in_doc_url": "/view", + "in_fulltext_url": "/at_download/", + "selector": ".documentContent #content a", + "attr": "href", + "technique": "Media link (Plone)", + "example_page": "http://xjornadaslc.fahce.unlp.edu.ar/actas/Ramon_Esteban_Chaparro.pdf/view", + }, + { + "in_doc_url": "isca-speech.org/", + "in_fulltext_url": "pdf", + "selector": ".w3-container a", + "attr": "href", + "technique": "PDF URL link (isca-speech.org)", + "example_page": "https://www.isca-speech.org/archive/interspeech_2006/chitturi06b_interspeech.html", + }, + { + "in_doc_url": "://repository.dri.ie/", + "in_fulltext_url": "/download", + "selector": "#dri_download_assets > div > a", + "attr": "href", + "technique": "Download link (repository.dri.ie)", + "example_page": "https://repository.dri.ie/catalog/qf8621102", + }, + { + "in_doc_url": "frontiersin.org/", + "in_fulltext_url": "pdf", + "selector": "a.download-files-pdf", + "attr": "href", + "technique": "PDF Download link (frontiersin.org)", + "example_page": "https://www.frontiersin.org/articles/10.3389/fnins.2021.722592/full", + }, + { + "in_doc_url": "cureus.com/", + "in_fulltext_url": "pdf", + "selector": ".small-medium-pdf a.pdf-download-button", + "attr": "href", + "technique": "PDF Download link (cureus.com)", + "example_page": "https://www.cureus.com/articles/69542-tramadol-induced-jerks", + }, + { + "in_doc_url": "e-manuscripta.ch/", + "in_fulltext_url": "pdf", + "selector": "#titleinfoPdfDownload a.resourceLink", + "attr": "href", + "technique": "PDF Download link (e-manuscripta.ch)", + "example_page": "https://www.e-manuscripta.ch/zut/doi/10.7891/e-manuscripta-112176", + }, +] + +FULLTEXT_URL_PATTERNS_SKIP = [ + # wiley has a weird almost-blank page we don't want to loop on + "://onlinelibrary.wiley.com/doi/pdf/" + "://doi.org/" + "://dx.doi.org/" +] + +RELEASE_TYPE_MAP = { + "research article": "article-journal", + "text.serial.journal": "article-journal", +} + + +class BiblioMetadata(pydantic.BaseModel): + title: Optional[str] + subtitle: Optional[str] + contrib_names: Optional[List[str]] + release_date: Optional[datetime.date] + release_year: Optional[int] + release_type: Optional[str] + release_stage: Optional[str] + withdrawn_status: Optional[str] + lang: Optional[str] + country_code: Optional[str] + volume: Optional[str] + issue: Optional[str] + number: Optional[str] + pages: Optional[str] + first_page: Optional[str] + last_page: Optional[str] + license: Optional[str] + publisher: Optional[str] + container_name: Optional[str] + container_abbrev: Optional[str] + container_issn: Optional[str] + container_type: Optional[str] + raw_references: Optional[List[str]] + + doi: Optional[str] + pmid: Optional[str] + pmcid: Optional[str] + isbn13: Optional[str] + publisher_ident: Optional[str] + oai_id: Optional[str] + + abstract: Optional[str] + pdf_fulltext_url: Optional[str] + html_fulltext_url: Optional[str] + xml_fulltext_url: Optional[str] + component_url: Optional[str] + + class Config: + json_encoders = { + datetime.date: lambda dt: dt.isoformat() + } + + +def html_extract_fulltext_url(doc_url: str, doc: HTMLParser, patterns: List[dict]) -> Optional[Tuple[str, str]]: + """ + Tries to quickly extract fulltext URLs using a set of patterns. This + function is intendend to be generic across various extraction techniques. + + Returns null or a tuple of (url, technique) + """ + self_doc_url: Optional[Tuple[str, str]] = None + for pattern in patterns: + if not 'selector' in pattern: + continue + if 'in_doc_url' in pattern: + if not pattern['in_doc_url'] in doc_url: + continue + elem = doc.css_first(pattern['selector']) + if not elem: + continue + if 'attr' in pattern: + val = elem.attrs.get(pattern['attr']) + if not val: + continue + val = urllib.parse.urljoin(doc_url, val) + assert val + if 'in_fulltext_url' in pattern: + if not pattern['in_fulltext_url'] in val: + continue + for skip_pattern in FULLTEXT_URL_PATTERNS_SKIP: + if skip_pattern in val.lower(): + continue + if url_fuzzy_equal(doc_url, val): + # don't link to self, unless no other options + self_doc_url = (val, pattern.get('technique', 'unknown')) + continue + return (val, pattern.get('technique', 'unknown')) + if self_doc_url: + print(f" WARN: returning fulltext URL pointing to self", file=sys.stderr) + return self_doc_url + return None + +def html_extract_biblio(doc_url: str, doc: HTMLParser) -> Optional[BiblioMetadata]: + + meta: Any = dict() + head = doc.css_first("head") + if not head: + return None + + for field, patterns in HEAD_META_PATTERNS.items(): + for pattern in patterns: + val = head.css_first(pattern) + #print((field, pattern, val)) + if val and 'content' in val.attrs and val.attrs['content']: + meta[field] = val.attrs['content'] + break + + for field, patterns in HEAD_META_LIST_PATTERNS.items(): + for pattern in patterns: + val_list = head.css(pattern) + if val_list: + for val in val_list: + if 'content' in val.attrs and val.attrs['content']: + if not field in meta: + meta[field] = [] + meta[field].append(val.attrs['content']) + break + + # (some) fulltext extractions + pdf_fulltext_url = html_extract_fulltext_url(doc_url, doc, PDF_FULLTEXT_PATTERNS) + if pdf_fulltext_url: + meta['pdf_fulltext_url'] = pdf_fulltext_url[0] + xml_fulltext_url = html_extract_fulltext_url(doc_url, doc, XML_FULLTEXT_PATTERNS) + if xml_fulltext_url: + meta['xml_fulltext_url'] = xml_fulltext_url[0] + html_fulltext_url = html_extract_fulltext_url(doc_url, doc, HTML_FULLTEXT_PATTERNS) + if html_fulltext_url: + meta['html_fulltext_url'] = html_fulltext_url[0] + component_url = html_extract_fulltext_url(doc_url, doc, COMPONENT_FULLTEXT_PATTERNS) + if component_url: + meta['component_url'] = component_url[0] + + # TODO: replace with clean_doi() et al + if meta.get('doi') and meta.get('doi').startswith('doi:'): + meta['doi'] = meta['doi'][4:] + + raw_identifiers = meta.pop('raw_identifiers', []) + for ident in raw_identifiers: + if ident.startswith('doi:10.'): + if not 'doi' in meta: + meta['doi'] = ident.replace('doi:', '') + elif ident.startswith('10.') and '/' in ident: + if not 'doi' in meta: + meta['doi'] = ident + elif ident.startswith('isbn:'): + if not 'isbn' in meta: + meta['isbn'] = ident.replace('isbn:', '') + + raw_date = meta.pop('raw_date', None) + if raw_date: + parsed = dateparser.parse(raw_date) + if parsed: + meta['release_date'] = parsed.date() + + raw_release_type = meta.pop('raw_release_type', None) + if raw_release_type: + release_type = RELEASE_TYPE_MAP.get(raw_release_type.lower().strip()) + if release_type: + meta['release_type'] = release_type + + return BiblioMetadata(**meta) + +def load_adblock_rules() -> braveblock.Adblocker: + """ + TODO: consider blocking very generic assets: + - ://fonts.googleapis.com/css* + - ://journals.plos.org/plosone/resource/img/icon.* + """ + return braveblock.Adblocker( + include_easylist=True, + include_easyprivacy=True, + rules=[ + "/favicon.ico^", + "||fonts.googleapis.com^", + "||widgets.figshare.com^", + "||crossmark-cdn.crossref.org^", + "||crossmark.crossref.org^", + "||platform.twitter.com^", + "||verify.nature.com^", + "||s7.addthis.com^", + "||www.mendeley.com^", + "||pbs.twimg.com^", + "||badge.dimensions.ai^", + "||recaptcha.net^", + + # not sure about these CC badges (usually via a redirect) + #"||licensebuttons.net^", + #"||i.creativecommons.org^", + + # Should we skip jquery, or other generic javascript CDNs? + #"||code.jquery.com^", + #"||ajax.googleapis.com^", + #"||cdnjs.cloudflare.com^", + + # badges, "share" buttons, tracking, etc + "apis.google.com/js/plusone", + "www.google.com/recaptcha/", + "js/_getUACode.js" + + # PLOS images + "/resource/img/icon.*.16.png^", + ], + ) + + +def _extract_generic(doc: HTMLParser, selector: str, attrs: List[str], type_name: str) -> list: + resources = [] + + for node in doc.css(selector): + for attr in attrs: + if not attr in node.attrs: + continue + url = node.attrs.get(attr) + # special-case a couple meta URI prefixes which don't match with adblock rules + skip = False + for prefix in ['about:', 'data:', 'magnet:', 'urn:', 'mailto:']: + if url and url.startswith(prefix): + skip = True + break + if skip: + continue + if url: + #print(url, file=sys.stderr) + resources.append(dict(url=url.strip(), type=type_name)) + + return resources + + +def html_extract_resources(doc_url: str, doc: HTMLParser, adblock: braveblock.Adblocker) -> list: + """ + This function tries to find all the important resources in a page. The + presumption is that the HTML document is article fulltext, and we want the + list of all resoures (by URL) necessary to replay the page. + + The returned resource URLs each have a type (script, img, css, etc), and + should be fully-qualified URLs (not relative). + + Adblock filtering is run to remove unwanted resources. + """ + resources = [] + + # select various resource references + resources += _extract_generic(doc, "script", ["src"], "script") + resources += _extract_generic(doc, "link[rel='stylesheet']", ["href"], "stylesheet") + # TODO: srcset and parse + # eg: https://dfzljdn9uc3pi.cloudfront.net/2018/4375/1/fig-5-2x.jpg 1200w, https://dfzljdn9uc3pi.cloudfront.net/2018/4375/1/fig-5-1x.jpg 600w, https://dfzljdn9uc3pi.cloudfront.net/2018/4375/1/fig-5-small.jpg 355w + resources += _extract_generic(doc, "img", ["src"], "image") + resources += _extract_generic(doc, "audio", ["src"], "audio") + resources += _extract_generic(doc, "video", ["src"], "media") + resources += _extract_generic(doc, "source", ["src"], "media") + resources += _extract_generic(doc, "track", ["src"], "media") + resources += _extract_generic(doc, "iframe", ["src"], "subdocument") + resources += _extract_generic(doc, "embed", ["src"], "media") + + # ensure URLs are absolute + for r in resources: + r['url'] = urllib.parse.urljoin(doc_url, r['url']) + + # filter using adblocker + resources = [r for r in resources if adblock.check_network_urls(r['url'], source_url=doc_url, request_type=r['type']) == False] + + # remove duplicates + resources = [dict(t) for t in {tuple(d.items()) for d in resources}] + + return resources + diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py new file mode 100644 index 0000000..c586972 --- /dev/null +++ b/python/sandcrawler/ia.py @@ -0,0 +1,1138 @@ + +# XXX: some broken MRO thing going on in here due to python3 object wrangling +# in `wayback` library. Means we can't run pylint. +# pylint: skip-file + +import os +import sys +import time +import gzip +import json +import requests +import datetime +import urllib.parse +import urllib3.exceptions +from typing import Tuple +from collections import namedtuple + +import http.client + +# not sure this will really work. Should go before wayback imports. +http.client._MAXHEADERS = 1000 # type: ignore + +import wayback.exception +from http.client import IncompleteRead +from wayback.resourcestore import ResourceStore +from gwb.loader import CDXLoaderFactory3 + +from .misc import b32_hex, requests_retry_session, gen_file_metadata, clean_url + +class SandcrawlerBackoffError(Exception): + """ + A set of Exceptions which are raised through multiple abstraction layers to + indicate backpressure. For example, SPNv2 back-pressure sometimes needs to + be passed up through any timeout/retry code and become an actual long pause + or crash. + """ + pass + +ResourceResult = namedtuple("ResourceResult", [ + "start_url", + "hit", + "status", + "terminal_url", + "terminal_dt", + "terminal_status_code", + "body", + "cdx", + "revisit_cdx", +]) + +WarcResource = namedtuple("WarcResource", [ + "status_code", + "location", + "body", + "revisit_cdx", +]) + +CdxRow = namedtuple('CdxRow', [ + 'surt', + 'datetime', + 'url', + 'mimetype', + 'status_code', + 'sha1b32', + 'sha1hex', + 'warc_csize', + 'warc_offset', + 'warc_path', +]) + +CdxPartial = namedtuple('CdxPartial', [ + 'surt', + 'datetime', + 'url', + 'mimetype', + 'status_code', + 'sha1b32', + 'sha1hex', +]) + +def cdx_partial_from_row(full): + return CdxPartial( + surt=full.surt, + datetime=full.datetime, + url=full.url, + mimetype=full.mimetype, + status_code=full.status_code, + sha1b32=full.sha1b32, + sha1hex=full.sha1hex, + ) + +def cdx_to_dict(cdx): + d = { + "surt": cdx.surt, + "datetime": cdx.datetime, + "url": cdx.url, + "mimetype": cdx.mimetype, + "status_code": cdx.status_code, + "sha1b32": cdx.sha1b32, + "sha1hex": cdx.sha1hex, + } + if type(cdx) == CdxRow and '/' in cdx.warc_path: + d['warc_csize'] = cdx.warc_csize + d['warc_offset'] = cdx.warc_offset + d['warc_path'] = cdx.warc_path + return d + +def fuzzy_match_url(left, right): + """ + Matches URLs agnostic of http/https (and maybe other normalizations in the + future) + """ + if left == right: + return True + if '://' in left and '://' in right: + left = '://'.join(left.split('://')[1:]) + right = '://'.join(right.split('://')[1:]) + if left == right: + return True + if left == right + "/" or right == left + "/": + return True + return False + +def test_fuzzy_match_url(): + assert fuzzy_match_url("http://thing.com", "http://thing.com") == True + assert fuzzy_match_url("http://thing.com", "https://thing.com") == True + assert fuzzy_match_url("http://thing.com", "ftp://thing.com") == True + assert fuzzy_match_url("http://thing.com", "http://thing.com/") == True + assert fuzzy_match_url("https://thing.com", "http://thing.com/") == True + assert fuzzy_match_url("https://thing.com/", "http://thing.com") == True + assert fuzzy_match_url("http://thing.com", "http://thing.com/blue") == False + + # should probably handle these? + assert fuzzy_match_url("http://thing.com", "http://www.thing.com") == False + assert fuzzy_match_url("http://www.thing.com", "http://www2.thing.com") == False + assert fuzzy_match_url("http://www.thing.com", "https://www2.thing.com") == False + +class CdxApiError(Exception): + pass + +class CdxApiClient: + + def __init__(self, host_url="https://web.archive.org/cdx/search/cdx", **kwargs): + self.host_url = host_url + self.http_session = requests_retry_session(retries=3, backoff_factor=3) + cdx_auth_token = kwargs.get('cdx_auth_token', + os.environ.get('CDX_AUTH_TOKEN')) + if not cdx_auth_token: + raise Exception("CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)") + self.http_session.headers.update({ + 'User-Agent': 'Mozilla/5.0 sandcrawler.CdxApiClient', + 'Cookie': 'cdx_auth_token={}'.format(cdx_auth_token), + }) + + def _query_api(self, params): + """ + Hits CDX API with a query, parses result into a list of CdxRow + """ + resp = self.http_session.get(self.host_url, params=params) + if resp.status_code != 200: + raise CdxApiError(resp.text) + #print(resp.url, file=sys.stderr) + if not resp.text: + return None + rj = resp.json() + if len(rj) <= 1: + return None + rows = [] + for raw in rj[1:]: + # check number of CDX fields; there is a bug with some rows having + # spaces in WARC filename resulting in extra bogus fields + if len(raw) != 11: + raise CdxApiError(f"CDX response had {len(raw)} fields, not 11 expected") + + # transform "-" ftp status code to a 226 + status_code = None + if raw[4] == "-": + if raw[3] != "warc/revisit" and raw[2].startswith("ftp://"): + status_code = 226 + else: + status_code = int(raw[4]) + + # CDX rows with no WARC records? + if raw[8] == '-' or raw[9] == '-' or raw[10] == '-': + continue + + row = CdxRow( + surt=raw[0], + datetime=raw[1], + url=raw[2], + mimetype=raw[3], + status_code=status_code, + sha1b32=raw[5], + sha1hex=b32_hex(raw[5]), + warc_csize=int(raw[8]), + warc_offset=int(raw[9]), + warc_path=raw[10], + ) + assert (row.mimetype == "-") or ("-" not in row) + rows.append(row) + return rows + + def fetch(self, url, datetime, filter_status_code=None, retry_sleep=None): + """ + Fetches a single CDX row by url/datetime. Raises a KeyError if not + found, because we expect to be looking up a specific full record. + """ + if len(datetime) != 14: + raise ValueError("CDX fetch requires full 14 digit timestamp. Got: {}".format(datetime)) + params = { + 'url': url, + 'from': datetime, + 'to': datetime, + 'matchType': 'exact', + 'limit': 1, + 'output': 'json', + } + if filter_status_code: + params['filter'] = "statuscode:{}".format(filter_status_code) + resp = self._query_api(params) + if not resp: + if retry_sleep and retry_sleep > 0: + next_sleep = None + if retry_sleep > 3: + next_sleep = retry_sleep - 3 + retry_sleep = 3 + print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), file=sys.stderr) + time.sleep(retry_sleep) + return self.fetch(url, datetime, filter_status_code=filter_status_code, retry_sleep=next_sleep) + raise KeyError("CDX url/datetime not found: {} {}".format(url, datetime)) + row = resp[0] + # allow fuzzy http/https match + if not (fuzzy_match_url(row.url, url) and row.datetime == datetime): + if retry_sleep and retry_sleep > 0: + print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), file=sys.stderr) + time.sleep(retry_sleep) + return self.fetch(url, datetime, filter_status_code=filter_status_code, retry_sleep=None) + raise KeyError("Didn't get exact CDX url/datetime match. url:{} dt:{} got:{}".format(url, datetime, row)) + if filter_status_code: + assert row.status_code == filter_status_code + return row + + def lookup_best(self, url, max_age_days=None, best_mimetype=None, closest=None): + """ + Fetches multiple CDX rows for the given URL, tries to find the most recent. + + If no matching row is found, return None. Note this is different from fetch. + + Preference order by status code looks like: + + 200 or 226 + mimetype match + not-liveweb + most-recent + no match + not-liveweb + most-recent + 3xx + most-recent + 4xx + most-recent + 5xx + most-recent + + """ + params = { + 'url': url, + 'matchType': 'exact', + 'limit': -25, + 'output': 'json', + # Collapsing seems efficient, but is complex; would need to include + # other filters and status code in filter + #'collapse': 'timestamp:6', + + # Revisits now allowed and resolved! + #'filter': '!mimetype:warc/revisit', + } + if max_age_days: + since = datetime.date.today() - datetime.timedelta(days=max_age_days) + params['from'] = '%04d%02d%02d' % (since.year, since.month, since.day), + if closest: + params['closest'] = closest + params['sort'] = "closest" + #print(params, file=sys.stderr) + rows = self._query_api(params) + if not rows: + return None + + def _cdx_sort_key(r): + """ + This is a function, not a lambda, because it captures + best_mimetype. Will create a tuple that can be used to sort in + *reverse* order. + """ + return ( + int(r.status_code in (200, 226)), + int(0 - (r.status_code or 999)), + int(r.mimetype == best_mimetype), + int(r.mimetype != "warc/revisit"), + int(r.datetime[:6]), + int('/' in r.warc_path), + int(r.datetime), + ) + + rows = sorted(rows, key=_cdx_sort_key) + return rows[-1] + + +class WaybackError(Exception): + pass + +class WaybackContentError(Exception): + pass + +class PetaboxError(Exception): + pass + +class NoCaptureError(Exception): + pass + +class WaybackClient: + + def __init__(self, cdx_client=None, **kwargs): + if cdx_client: + self.cdx_client = cdx_client + else: + self.cdx_client = CdxApiClient() + # /serve/ instead of /download/ doesn't record view count + # this *does* want to be http://, not https:// + self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/') + # gwb library will fall back to reading from /opt/.petabox/webdata.secret + self.petabox_webdata_secret = kwargs.get( + 'petabox_webdata_secret', + os.environ.get('PETABOX_WEBDATA_SECRET'), + ) + self.warc_uri_prefix = kwargs.get('warc_uri_prefix', 'https://archive.org/serve/') + self.rstore = None + self.max_redirects = 25 + self.wayback_endpoint = "https://web.archive.org/web/" + self.replay_headers = { + 'User-Agent': 'Mozilla/5.0 sandcrawler.WaybackClient', + } + + def fetch_petabox(self, csize, offset, warc_path, resolve_revisit=True): + """ + Fetches wayback resource directly from petabox using WARC path/offset/csize. + + If there is a problem with petabox, raises a PetaboxError. + If resource doesn't exist, would raise a KeyError (TODO). + + The body is only returned if the record is success (HTTP 200 or + equivalent). Otherwise only the status and header info is returned. + + WarcResource object (namedtuple) contains fields: + - status_code: int + - location: eg, for redirects + - body: raw bytes + + resolve_revist does what it sounds like: tries following a revisit + record by looking up CDX API and then another fetch. Refuses to recurse + more than one hop (eg, won't follow a chain of revisits). + + Requires (and uses) a secret token. + """ + if not self.petabox_webdata_secret: + raise Exception("WaybackClient needs petabox secret to do direct WARC fetches") + if not "/" in warc_path: + raise ValueError("what looks like a liveweb/SPN temporary warc path: {}".format(warc_path)) + warc_uri = self.warc_uri_prefix + warc_path + if not self.rstore: + self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory3( + webdata_secret=self.petabox_webdata_secret, + )) + try: + #print("offset: {} csize: {} uri: {}".format(offset, csize, warc_uri), file=sys.stderr) + gwb_record = self.rstore.load_resource(warc_uri, offset, csize) + except wayback.exception.ResourceUnavailable: + print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr) + raise PetaboxError("failed to load file contents from wayback/petabox (ResourceUnavailable)") + except wayback.exception.InvalidResource: + print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr) + raise WaybackContentError("failed to load file contents from wayback/petabox (InvalidResource)") + except urllib3.exceptions.ReadTimeoutError as rte: + raise PetaboxError("failed to load file contents from wayback/petabox (ReadTimeoutError: {})".format(rte)) + except ValueError as ve: + raise PetaboxError("failed to load file contents from wayback/petabox (ValueError: {})".format(ve)) + except EOFError as eofe: + raise PetaboxError("failed to load file contents from wayback/petabox (EOFError: {})".format(eofe)) + except TypeError as te: + raise PetaboxError("failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te)) + except Exception as e: + if "while decompressing data: invalid block type" in str(e): + raise PetaboxError("decompression error fetching WARC record; usually due to bad alexa ARC files") + else: + raise e + # Note: could consider a generic "except Exception" here, as we get so + # many petabox errors. Do want jobs to fail loud and clear when the + # whole cluster is down though. + + try: + status_code = gwb_record.get_status()[0] + except http.client.HTTPException: + raise WaybackContentError("too many HTTP headers (in wayback fetch)") + location = gwb_record.get_location() or None + + if status_code is None and gwb_record.target_uri.startswith(b"ftp://") and not gwb_record.is_revisit(): + # TODO: some additional verification here? + status_code = 226 + + body = None + revisit_cdx = None + if gwb_record.is_revisit(): + if not resolve_revisit: + raise WaybackContentError("found revisit record, but won't resolve (loop?)") + revisit_uri, revisit_dt = gwb_record.refers_to + if not (revisit_uri and revisit_dt): + raise WaybackContentError("revisit record missing URI and/or DT: warc:{} offset:{}".format( + warc_path, offset)) + # convert revisit_dt + # len("2018-07-24T11:56:49"), or with "Z" + assert len(revisit_dt) in (19, 20) + if type(revisit_uri) is bytes: + revisit_uri = revisit_uri.decode('utf-8') + if type(revisit_dt) is bytes: + revisit_dt = revisit_dt.decode('utf-8') + revisit_dt = revisit_dt.replace('-', '').replace(':', '').replace('T', '').replace('Z', '') + assert len(revisit_dt) == 14 + try: + revisit_cdx = self.cdx_client.fetch(revisit_uri, revisit_dt) + body = self.fetch_petabox_body( + csize=revisit_cdx.warc_csize, + offset=revisit_cdx.warc_offset, + warc_path=revisit_cdx.warc_path, + resolve_revisit=False, + expected_status_code=revisit_cdx.status_code, + ) + except KeyError as ke: + raise WaybackError("Revist resolution failed: {}".format(ke)) + elif status_code in (200, 226): + try: + body = gwb_record.open_raw_content().read() + except IncompleteRead as ire: + raise WaybackError( + "failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire)) + elif status_code is None: + raise WaybackContentError( + "got a None status_code in (W)ARC record") + return WarcResource( + status_code=status_code, + location=location, + body=body, + revisit_cdx=revisit_cdx, + ) + + def fetch_petabox_body(self, csize, offset, warc_path, resolve_revisit=True, expected_status_code=None): + """ + Fetches HTTP 200 WARC resource directly from petabox using WARC path/offset/csize. + + Returns bytes. Raises KeyError if resource wasn't an HTTP 200. + + Thin helper around fetch_petabox() + """ + resource = self.fetch_petabox( + csize=csize, + offset=offset, + warc_path=warc_path, + resolve_revisit=resolve_revisit, + ) + + if expected_status_code: + if expected_status_code != resource.status_code: + raise KeyError("archived HTTP response (WARC) was not {}: {}".format( + expected_status_code, + resource.status_code, + ) + ) + elif resource.status_code not in (200, 226): + raise KeyError("archived HTTP response (WARC) was not 200: {}".format( + resource.status_code) + ) + + return resource.body + + def fetch_replay_body(self, url, datetime, cdx_sha1hex=None): + """ + Fetches an HTTP 200 record from wayback via the replay interface + (web.archive.org) instead of petabox. + + Intended for use with SPN2 requests, where request body has not ended + up in petabox yet. + + If cdx_sha1hex is passed, will try to verify fetched body. Note that + this check *won't work* in many cases, due to CDX hash being of + compressed transfer data, not the uncompressed final content bytes. + + TODO: could instead try to verify that we got the expected replay body + using... new X-Archive headers? + """ + + # defensively check datetime format + assert len(datetime) == 14 + assert datetime.isdigit() + + try: + resp = requests.get( + self.wayback_endpoint + datetime + "id_/" + url, + allow_redirects=False, + headers=self.replay_headers, + ) + except requests.exceptions.TooManyRedirects: + raise WaybackContentError("redirect loop (wayback replay fetch)") + except requests.exceptions.ChunkedEncodingError: + raise WaybackError("ChunkedEncodingError (wayback replay fetch)") + except UnicodeDecodeError: + raise WaybackContentError("UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(url)) + + try: + resp.raise_for_status() + except Exception as e: + raise WaybackError(str(e)) + #print(resp.url, file=sys.stderr) + + # defensively check that this is actually correct replay based on headers + if not "X-Archive-Src" in resp.headers: + raise WaybackError("replay fetch didn't return X-Archive-Src in headers") + if not datetime in resp.url: + raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(datetime, resp.url)) + + if cdx_sha1hex: + # verify that body matches CDX hash + # TODO: don't need *all* these hashes, just sha1 + file_meta = gen_file_metadata(resp.content) + if cdx_sha1hex != file_meta['sha1hex']: + print(" REPLAY MISMATCH: cdx:{} replay:{}".format( + cdx_sha1hex, + file_meta['sha1hex']), + file=sys.stderr) + raise WaybackContentError("replay fetch body didn't match CDX hash cdx:{} body:{}".format( + cdx_sha1hex, + file_meta['sha1hex']), + ) + return resp.content + + def fetch_replay_redirect(self, url, datetime): + """ + Fetches an HTTP 3xx redirect Location from wayback via the replay interface + (web.archive.org) instead of petabox. + + Intended for use with SPN2 requests, where request body has not ended + up in petabox yet. For example, re-ingesting a base_url which was + recently crawler by SPNv2, where we are doing ingest via wayback path. + + Returns None if response is found, but couldn't find redirect. + """ + + # defensively check datetime format + assert len(datetime) == 14 + assert datetime.isdigit() + + try: + resp = requests.get( + self.wayback_endpoint + datetime + "id_/" + url, + allow_redirects=False, + headers=self.replay_headers, + ) + except requests.exceptions.TooManyRedirects: + raise WaybackContentError("redirect loop (wayback replay fetch)") + except UnicodeDecodeError: + raise WaybackContentError("UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(url)) + try: + resp.raise_for_status() + except Exception as e: + raise WaybackError(str(e)) + #print(resp.url, file=sys.stderr) + + # defensively check that this is actually correct replay based on headers + # previously check for "X-Archive-Redirect-Reason" here + if not "X-Archive-Src" in resp.headers: + raise WaybackError("redirect replay fetch didn't return X-Archive-Src in headers") + if not datetime in resp.url: + raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(datetime, resp.url)) + + redirect_url = resp.headers.get("Location") + # eg, https://web.archive.org/web/20200111003923id_/https://dx.doi.org/10.17504/protocols.io.y2gfybw + #print(redirect_url, file=sys.stderr) + if redirect_url and redirect_url.startswith("https://web.archive.org/web/"): + redirect_url = "/".join(redirect_url.split("/")[5:]) + #print(redirect_url, file=sys.stderr) + if redirect_url and redirect_url.startswith("http"): + redirect_url = clean_url(redirect_url) + return redirect_url + else: + return None + + def lookup_resource(self, start_url, best_mimetype=None, closest=None): + """ + Looks in wayback for a resource starting at the URL, following any + redirects. Returns a ResourceResult object, which may indicate a + failure to fetch the resource. + + Only raises exceptions on remote service failure or unexpected + problems. + + In a for loop: + + lookup "best" CDX + redirect status code? + fetch wayback + continue + success (200)? + fetch wayback + return success + bad (other status)? + return failure + + got to end? + return failure; too many redirects + """ + next_url = start_url + urls_seen = [start_url] + for i in range(self.max_redirects): + print(" URL: {}".format(next_url), file=sys.stderr) + cdx_row = self.cdx_client.lookup_best(next_url, best_mimetype=best_mimetype, closest=closest) + #print(cdx_row, file=sys.stderr) + if not cdx_row: + return ResourceResult( + start_url=start_url, + hit=False, + status="no-capture", + terminal_url=next_url, + terminal_dt=None, + terminal_status_code=None, + body=None, + cdx=None, + revisit_cdx=None, + ) + + # first try straight-forward redirect situation + if cdx_row.mimetype == "warc/revisit" and '/' in cdx_row.warc_path: + resource = self.fetch_petabox( + csize=cdx_row.warc_csize, + offset=cdx_row.warc_offset, + warc_path=cdx_row.warc_path, + ) + if resource.revisit_cdx and resource.revisit_cdx.status_code in (200, 226): + return ResourceResult( + start_url=start_url, + hit=True, + status="success", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=resource.revisit_cdx.status_code, + body=resource.body, + cdx=cdx_row, + revisit_cdx=resource.revisit_cdx, + ) + # else, continue processing with revisit record + + if cdx_row.status_code in (200, 226): + revisit_cdx = None + if '/' in cdx_row.warc_path: + resource = self.fetch_petabox( + csize=cdx_row.warc_csize, + offset=cdx_row.warc_offset, + warc_path=cdx_row.warc_path, + ) + body = resource.body + revisit_cdx = resource.revisit_cdx + else: + body = self.fetch_replay_body( + url=cdx_row.url, + datetime=cdx_row.datetime, + ) + cdx_row = cdx_partial_from_row(cdx_row) + return ResourceResult( + start_url=start_url, + hit=True, + status="success", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=cdx_row.status_code, + body=body, + cdx=cdx_row, + revisit_cdx=revisit_cdx, + ) + elif 300 <= (cdx_row.status_code or 0) < 400: + if '/' in cdx_row.warc_path: + resource = self.fetch_petabox( + csize=cdx_row.warc_csize, + offset=cdx_row.warc_offset, + warc_path=cdx_row.warc_path, + resolve_revisit=False, + ) + assert 300 <= resource.status_code < 400 + if not resource.location: + print(" bad redirect record: {}".format(cdx_row), file=sys.stderr) + return ResourceResult( + start_url=start_url, + hit=False, + status="bad-redirect", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=cdx_row.status_code, + body=None, + cdx=cdx_row, + revisit_cdx=None, + ) + if not "://" in resource.location: + next_url = urllib.parse.urljoin(next_url, resource.location) + else: + next_url = resource.location + if next_url: + next_url = clean_url(next_url) + else: + next_url = self.fetch_replay_redirect( + url=cdx_row.url, + datetime=cdx_row.datetime, + ) + if next_url: + next_url = clean_url(next_url) + cdx_row = cdx_partial_from_row(cdx_row) + if not next_url: + print(" bad redirect record: {}".format(cdx_row), file=sys.stderr) + return ResourceResult( + start_url=start_url, + hit=False, + status="bad-redirect", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=cdx_row.status_code, + body=None, + cdx=cdx_row, + revisit_cdx=None, + ) + if next_url in urls_seen: + return ResourceResult( + start_url=start_url, + hit=False, + status="redirect-loop", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=cdx_row.status_code, + body=None, + cdx=cdx_row, + revisit_cdx=None, + ) + urls_seen.append(next_url) + continue + else: + return ResourceResult( + start_url=start_url, + hit=False, + status="terminal-bad-status", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=cdx_row.status_code, + body=None, + cdx=cdx_row, + revisit_cdx=None, + ) + return ResourceResult( + start_url=start_url, + hit=False, + status="redirects-exceeded", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=cdx_row.status_code, + body=None, + cdx=cdx_row, + revisit_cdx=None, + ) + + +class SavePageNowError(Exception): + pass + +class SavePageNowBackoffError(SandcrawlerBackoffError): + pass + +SavePageNowResult = namedtuple('SavePageNowResult', [ + 'success', + 'status', + 'job_id', + 'request_url', + 'terminal_url', + 'terminal_dt', + 'resources', +]) + +class SavePageNowClient: + + def __init__(self, v2endpoint="https://web.archive.org/save", **kwargs): + self.ia_access_key = kwargs.get('ia_access_key', + os.environ.get('IA_ACCESS_KEY')) + self.ia_secret_key = kwargs.get('ia_secret_key', + os.environ.get('IA_SECRET_KEY')) + self.v2endpoint = v2endpoint + self.v2_session = requests_retry_session(retries=5, backoff_factor=3) + self.v2_session.headers.update({ + 'User-Agent': 'Mozilla/5.0 sandcrawler.SavePageNowClient', + 'Accept': 'application/json', + 'Authorization': 'LOW {}:{}'.format(self.ia_access_key, self.ia_secret_key), + }) + + # 3 minutes total + self.poll_count = 60 + self.poll_seconds = 3.0 + + self.spn_cdx_retry_sec = kwargs.get('spn_cdx_retry_sec', 9.0) + + def save_url_now_v2(self, request_url, force_simple_get=0, capture_outlinks=0): + """ + Returns a "SavePageNowResult" (namedtuple) if SPN request was processed + at all, or raises an exception if there was an error with SPN itself. + + If SPN2 was unable to fetch the remote content, `success` will be + false and status will be indicated. + + SavePageNowResult fields: + - success: boolean if SPN + - status: "success" or an error message/type + - job_id: returned by API + - request_url: url we asked to fetch + - terminal_url: final primary resource (after any redirects) + - terminal_dt: wayback timestamp of final capture + - resources: list of all URLs captured + + TODO: parse SPN error codes (status string) and handle better. Eg, + non-200 remote statuses, invalid hosts/URLs, timeouts, backoff, etc. + """ + if capture_outlinks: + print(" capturing outlinks!", file=sys.stderr) + if not (self.ia_access_key and self.ia_secret_key): + raise Exception("SPN2 requires authentication (IA_ACCESS_KEY/IA_SECRET_KEY)") + if request_url.startswith("ftp://"): + return SavePageNowResult( + False, + "spn2-no-ftp", + None, + request_url, + None, + None, + None, + ) + resp = self.v2_session.post( + self.v2endpoint, + data={ + 'url': request_url, + 'capture_all': 1, + 'capture_outlinks': capture_outlinks, + 'capture_screenshot': 0, + 'if_not_archived_within': '1d', + 'force_get': force_simple_get, + 'skip_first_archive': 1, + 'outlinks_availability': 0, + 'js_behavior_timeout': 0, + }, + ) + if resp.status_code == 429: + raise SavePageNowBackoffError("status_code: {}, url: {}".format(resp.status_code, request_url)) + elif resp.status_code != 200: + raise SavePageNowError("SPN2 status_code: {}, url: {}".format(resp.status_code, request_url)) + resp_json = resp.json() + + if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json['message']: + raise SavePageNowBackoffError(resp_json['message']) + elif not resp_json or 'job_id' not in resp_json: + raise SavePageNowError( + "Didn't get expected 'job_id' field in SPN2 response: {}".format(resp_json)) + + job_id = resp_json['job_id'] + print(f" SPNv2 running: job_id={job_id} url={request_url}", file=sys.stderr) + + # poll until complete + final_json = None + for i in range(self.poll_count): + resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint, resp_json['job_id'])) + try: + resp.raise_for_status() + except: + raise SavePageNowError(resp.content) + status = resp.json()['status'] + if status == 'pending': + time.sleep(self.poll_seconds) + elif status in ('success', 'error'): + final_json = resp.json() + break + else: + raise SavePageNowError("Unknown SPN2 status:{} url:{}".format(status, request_url)) + + if not final_json: + raise SavePageNowError("SPN2 timed out (polling count exceeded)") + + # if there was a recent crawl of same URL, fetch the status of that + # crawl to get correct datetime + if final_json.get('original_job_id'): + print(f" SPN recent capture: {job_id} -> {final_json['original_job_id']}", file=sys.stderr) + resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint, final_json['original_job_id'])) + try: + resp.raise_for_status() + except: + raise SavePageNowError(resp.content) + final_json = resp.json() + + #print(final_json, file=sys.stderr) + + if final_json['status'] == "success": + if final_json.get('original_url').startswith('/'): + print(f" truncateded URL in JSON: {request_url} {json.dumps(final_json)}", file=sys.stderr) + return SavePageNowResult( + True, + "success", + job_id, + request_url, + final_json['original_url'], + final_json['timestamp'], + final_json['resources'], + ) + else: + if final_json['status'] == 'pending': + final_json['status'] = 'error:pending' + return SavePageNowResult( + False, + final_json.get('status_ext') or final_json['status'], + job_id, + request_url, + None, + None, + None, + ) + + def crawl_resource(self, start_url, wayback_client, force_simple_get=0): + """ + Runs a SPN2 crawl, then fetches body. + + There is a delay between SPN2 crawls and WARC upload to petabox, so we + need to fetch the body via wayback replay instead of petabox + range-request. + """ + + # HACK: capture CNKI domains with outlinks (for COVID-19 crawling) + if 'gzbd.cnki.net/' in start_url: + spn_result = self.save_url_now_v2(start_url, force_simple_get=force_simple_get, capture_outlinks=1) + else: + spn_result = self.save_url_now_v2(start_url, force_simple_get=force_simple_get) + + if not spn_result.success: + status = spn_result.status + if status in ("error:invalid-url", "error:not-found", + "error:invalid-host-resolution", "error:gateway-timeout", + "error:too-many-redirects", "error:read-timeout"): + status = status.replace("error:", "") + elif status in ("error:no-access", "error:forbidden"): + status = "forbidden" + elif status == "error:user-session-limit": + raise SavePageNowBackoffError("SPNv2 user-session-limit") + elif status == "error:internal-server-error": + status = "remote-server-error" + elif status.startswith("error:"): + status = "spn2-" + status + # despite other errors, call these a failure (so we don't retry) + if spn_result.terminal_url and (spn_result.terminal_url.endswith('/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1")): + status = "blocked-cookie" + return ResourceResult( + start_url=start_url, + hit=False, + status=status, + terminal_url=spn_result.terminal_url, + terminal_dt=spn_result.terminal_dt, + terminal_status_code=None, + body=None, + cdx=None, + revisit_cdx=None, + ) + #print(spn_result, file=sys.stderr) + + # detect partial URL response (aka, success, but missing full URL) + if not "://" in spn_result.terminal_url or spn_result.terminal_url.startswith('/'): + return ResourceResult( + start_url=start_url, + hit=False, + status="spn2-success-partial-url", + terminal_url=spn_result.terminal_url, + terminal_dt=spn_result.terminal_dt, + terminal_status_code=None, + body=None, + cdx=None, + revisit_cdx=None, + ) + + # don't try to CDX fetch for this common cookie block terminal + if spn_result.terminal_url.endswith('/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1"): + return ResourceResult( + start_url=start_url, + hit=False, + status="blocked-cookie", + terminal_url=spn_result.terminal_url, + terminal_dt=spn_result.terminal_dt, + terminal_status_code=None, + body=None, + cdx=None, + revisit_cdx=None, + ) + + cdx_row = None + # hack to work around elsevier weirdness + if "://pdf.sciencedirectassets.com/" in spn_result.request_url: + elsevier_pdf_cdx = wayback_client.cdx_client.lookup_best( + spn_result.request_url, + best_mimetype="application/pdf", + ) + if elsevier_pdf_cdx and elsevier_pdf_cdx.mimetype == "application/pdf": + print(" Trying pdf.sciencedirectassets.com hack!", file=sys.stderr) + cdx_row = elsevier_pdf_cdx + else: + print(" Failed pdf.sciencedirectassets.com hack!", file=sys.stderr) + #print(elsevier_pdf_cdx, file=sys.stderr) + + if not cdx_row: + # lookup exact + try: + filter_status_code = None + if spn_result.terminal_url.startswith("ftp://"): + filter_status_code = 226 + cdx_row = wayback_client.cdx_client.fetch( + url=spn_result.terminal_url, + datetime=spn_result.terminal_dt, + filter_status_code=filter_status_code, + retry_sleep=self.spn_cdx_retry_sec, + ) + # sometimes there are fuzzy http/https self-redirects with the + # same SURT; try to work around that + if cdx_row.status_code >= 300 and cdx_row.status_code < 400: + cdx_row = wayback_client.cdx_client.fetch( + url=spn_result.terminal_url, + datetime=spn_result.terminal_dt, + filter_status_code=200, + retry_sleep=self.spn_cdx_retry_sec, + ) + except KeyError as ke: + print(" CDX KeyError: {}".format(ke), file=sys.stderr) + return ResourceResult( + start_url=start_url, + hit=False, + status="spn2-cdx-lookup-failure", + terminal_url=spn_result.terminal_url, + terminal_dt=spn_result.terminal_dt, + terminal_status_code=None, + body=None, + cdx=None, + revisit_cdx=None, + ) + + #print(cdx_row, file=sys.stderr) + + revisit_cdx = None + if '/' in cdx_row.warc_path: + # Usually can't do this kind of direct fetch because CDX result is recent/live + resource = wayback_client.fetch_petabox( + csize=cdx_row.warc_csize, + offset=cdx_row.warc_offset, + warc_path=cdx_row.warc_path, + ) + body = resource.body + if resource.revisit_cdx: + assert resource.revisit_cdx.sha1hex == cdx_row.sha1hex + revisit_cdx = resource.revisit_cdx + else: + # note: currently not trying to verify cdx_row.sha1hex + try: + body = wayback_client.fetch_replay_body( + url=cdx_row.url, + datetime=cdx_row.datetime, + ) + except (WaybackError, WaybackContentError) as we: + return ResourceResult( + start_url=start_url, + hit=False, + status="spn2-wayback-error", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=None, + body=None, + cdx=None, + revisit_cdx=None, + ) + # warc_path etc will change, so strip them out + cdx_row = cdx_partial_from_row(cdx_row) + + assert cdx_row.status_code + if cdx_row.status_code in (200, 226): + return ResourceResult( + start_url=start_url, + hit=True, + status="success", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=cdx_row.status_code, + body=body, + cdx=cdx_row, + revisit_cdx=revisit_cdx, + ) + else: + return ResourceResult( + start_url=start_url, + hit=False, + status="terminal-bad-status", + terminal_url=cdx_row.url, + terminal_dt=cdx_row.datetime, + terminal_status_code=cdx_row.status_code, + body=body, + cdx=cdx_row, + revisit_cdx=revisit_cdx, + ) + + +def fix_transfer_encoding(file_meta: dict, resource: ResourceResult) -> Tuple[dict, ResourceResult]: + if resource.body and file_meta['mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip': + print(" transfer encoding not stripped: {}".format(resource.cdx.mimetype), file=sys.stderr) + inner_body = gzip.decompress(resource.body) + if not inner_body: + raise Exception("null body inside transfer encoding") + inner_resource = ResourceResult( + body=inner_body, + # copy all other fields + start_url=resource.start_url, + hit=resource.hit, + status=resource.status, + terminal_url=resource.terminal_url, + terminal_dt=resource.terminal_dt, + terminal_status_code=resource.terminal_status_code, + cdx=resource.cdx, + revisit_cdx=resource.revisit_cdx, + ) + inner_file_meta = gen_file_metadata(inner_resource.body) + return (inner_file_meta, inner_resource) + else: + return (file_meta, resource) diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py new file mode 100644 index 0000000..b852c69 --- /dev/null +++ b/python/sandcrawler/ingest.py @@ -0,0 +1,833 @@ + +import sys +import json +import gzip +import time +import base64 +import xml.etree.ElementTree +from collections import namedtuple +from typing import Optional, Tuple, Any, Dict, List +from http.server import BaseHTTPRequestHandler, HTTPServer + +import requests +from selectolax.parser import HTMLParser + +from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult, fix_transfer_encoding, NoCaptureError +from sandcrawler.grobid import GrobidClient +from sandcrawler.pdfextract import process_pdf, PdfExtractResult +from sandcrawler.misc import gen_file_metadata, clean_url, parse_cdx_datetime +from sandcrawler.html import extract_fulltext_url +from sandcrawler.html_ingest import fetch_html_resources, \ + quick_fetch_html_resources, html_guess_scope, html_extract_body_teixml, \ + WebResource, html_guess_platform +from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, html_extract_biblio, load_adblock_rules +from sandcrawler.workers import SandcrawlerWorker +from sandcrawler.db import SandcrawlerPostgrestClient +from sandcrawler.xml import xml_reserialize + + +MAX_BODY_SIZE_BYTES = 128*1024*1024 + +class IngestFileWorker(SandcrawlerWorker): + """ + High level flow is to look in history first, then go to live web if + resource not found. Following redirects is treated as "fetching a + resource". Current version fetches a single resource; if it isn't a hit + but is an HTML 200, treats it as a landing page, tries to extract + fulltext link, then fetches that resource. + + process(request, key=None) -> response + Does all the things! + + Check existing processing (short circuit): + + check_existing_ingest(base_url) -> ingest_file_result or none + process_existing(result) -> response + try fetching all the rows we want. if any don't exist, fetch the resource itself and call process_hit() + + Fetch resource: + + find_resource(url) -> ResourceResult + + Process resource: + + process_hit(ResourceResult) -> response + process_grobid(ResourceResult) + """ + + def __init__(self, sink=None, **kwargs): + super().__init__() + + self.sink = sink + self.wayback_client = kwargs.get('wayback_client') + if not self.wayback_client: + self.wayback_client = WaybackClient() + self.spn_client = kwargs.get('spn_client') + if not self.spn_client: + self.spn_client = SavePageNowClient(spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0)) + self.grobid_client = kwargs.get('grobid_client') + if not self.grobid_client: + self.grobid_client = GrobidClient() + self.pgrest_client = kwargs.get('pgrest_client') + if not self.pgrest_client: + self.pgrest_client = SandcrawlerPostgrestClient() + self.grobid_sink = kwargs.get('grobid_sink') + self.thumbnail_sink = kwargs.get('thumbnail_sink') + self.pdftext_sink = kwargs.get('pdftext_sink') + self.xmldoc_sink = kwargs.get('xmldoc_sink') + self.htmlteixml_sink = kwargs.get('htmlteixml_sink') + self.max_hops = 6 + + self.try_existing_ingest = kwargs.get('try_existing_ingest', False) + self.try_existing_grobid = kwargs.get('try_existing_grobid', True) + self.try_existing_pdfextract = kwargs.get('try_existing_pdfextract', True) + self.try_wayback = kwargs.get('try_wayback', True) + self.try_spn2 = kwargs.get('try_spn2', True) + self.html_quick_mode = kwargs.get('html_quick_mode', False) + self.adblock_rules = load_adblock_rules() + self.max_html_resources = 200 + + self.base_url_blocklist = [ + # robot blocking + "://hkvalidate.perfdrive.com/", + + # temporary, until we implement specific fetch and 'petabox' output + "://archive.org/", + "://www.archive.org/", + "://web.archive.org/web/", + + # out of scope + "://openlibrary.org/", + "://www.openlibrary.org/", + "://fatcat.wiki/", + "://orcid.org/", + "://doaj.org/", + + # Domain squats + "://bartandjones.com", + "://ijretm.com", + "://ijrcemas.com", + "://jist.net.in", + "://croisements-revue.org", + + # all stubs/previews, not full papers + "://page-one.live.cf.public.springer.com", + + # large datasets-only (no PDF expected) + "plutof.ut.ee/", + "www.gbif.org/", + "doi.pangaea.de/", + "www.plate-archive.org/", + "://doi.org/10.25642/ipk/gbis/", + "://apex.ipk-gatersleben.de/", + "fao.org/glis/", + + # Historical non-paper content: + "dhz.uni-passau.de/", # newspapers + "digital.ucd.ie/", # ireland national historical + + # DOI prefixes + "doi.org/10.2307/", # JSTOR; slow and many redirects + "doi.org/10.18730/", # fao.org: database entry + "doi.org/10.15468/", # gbif.org: database entry + + # deprecated domain (doesn't redirect correctly) + "://edoc.mpg.de/", + ] + + self.wall_blocklist = [ + # loginwall + "://profile.thieme.de/HTML/sso/ejournals/login.htm", + "://login.bepress.com/", + "?SAMLRequest=", + "://osapublishing.org/captcha/", + "/password-login", + "://gateway.isiknowledge.com/", + "/login?TARGET=", + ] + + self.cookie_blocklist = [ + "/cookieAbsent", + "cookieSet=1", + "error=cookies_not_supported", + ] + + # these are special-case web domains for which we want SPN2 to not run + # a headless browser (brozzler), but instead simply run wget. + # the motivation could be to work around browser issues, or in the + # future possibly to increase download efficiency (wget/fetch being + # faster than browser fetch) + self.spn2_simple_get_domains = [ + # direct PDF links + "://arxiv.org/pdf/", + "://europepmc.org/backend/ptpmcrender.fcgi", + "://pdfs.semanticscholar.org/", + "://res.mdpi.com/", + + # platform sites + "://zenodo.org/", + "://figshare.org/", + "://springernature.figshare.com/", + + # popular simple cloud storage or direct links + "://s3-eu-west-1.amazonaws.com/", + ] + + self.src_valid_mimetypes = [ + "text/x-tex", + "application/gzip", + "application/x-bzip", + "application/x-bzip2", + "application/zip", + "application/x-tar", + "application/msword", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ] + + self.component_valid_mimetypes = [ + "image/jpeg", + "image/tiff", + "image/png", + "image/gif", + "audio/mpeg", + "video/mp4", + "video/mpeg", + "text/plain", + "text/csv", + "application/json", + "application/xml", + "application/pdf", + "application/gzip", + "application/x-bzip", + "application/x-bzip2", + "application/zip ", + "application/x-rar ", + "application/x-7z-compressed", + "application/x-tar", + "application/vnd.ms-powerpoint", + "application/vnd.ms-excel", + "application/msword", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + ] + + + def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: + """ + Check in sandcrawler-db (postgres) to see if we have already ingested + this URL (ingest file result table). + + Returns existing row *if* found *and* we should use it, otherwise None. + + Looks at existing ingest results and makes a decision based on, eg, + status and timestamp. + """ + if not self.try_existing_ingest: + return None + existing = self.pgrest_client.get_ingest_file_result(ingest_type, base_url) + # TODO: filter on more flags? + if existing and existing['hit'] == True: + return existing + else: + return None + + def find_resource(self, url, best_mimetype=None, force_recrawl=False) -> Optional[ResourceResult]: + """ + Looks in wayback for a resource starting at the URL, following any + redirects. If a hit isn't found, try crawling with SPN. + """ + via = "none" + resource = None + + if url.startswith("http://web.archive.org/web/") or url.startswith("https://web.archive.org/web/"): + raise NotImplementedError("handling direct wayback links not supported yet") + + if url.startswith("http://archive.org/") or url.startswith("https://archive.org/"): + raise NotImplementedError("fetching from archive.org not implemented yet") + + if self.try_wayback and not force_recrawl: + via = "wayback" + resource = self.wayback_client.lookup_resource(url, best_mimetype) + + # check for "soft 404" conditions, where we should retry with live SPNv2 + soft404 = False + # NOTE: these are often not working with SPNv2 either, so disabling. If + # we really want to try again, should do force-recrawl + #if resource and resource.hit and resource.terminal_url.endswith('/cookieAbsent'): + # soft404 = True + + old_failure = False + if resource and not resource.hit and resource.terminal_dt and resource.terminal_dt < '20190000000000': + old_failure = True + + if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture') or soft404 or old_failure): + via = "spn2" + force_simple_get = 0 + for domain in self.spn2_simple_get_domains: + if domain in url: + force_simple_get = 1 + break + resource = self.spn_client.crawl_resource(url, self.wayback_client, force_simple_get=force_simple_get) + print("[FETCH {:>6}] {} {}".format( + via, + (resource and resource.status), + (resource and resource.terminal_url) or url), + file=sys.stderr) + return resource + + def process_existing(self, request: dict, result_row: dict) -> dict: + """ + If we have an existing ingest file result, do any database fetches or + additional processing necessary to return a result. + """ + raise NotImplementedError("process_existing() not tested or safe yet") + assert result_row['hit'] + existing_file_meta = self.pgrest_client.get_file_meta(result_row['terminal_sha1hex']) + existing_grobid = self.pgrest_client.get_grobid(result_row['terminal_sha1hex']) + existing_cdx = self.pgrest_client.get_cdx(result_row['terminal_url'], result_row['terminal_dt']) + if not (existing_file_meta and existing_grobid and existing_cdx): + raise NotImplementedError("partially-exsiting records not implemented yet") + result = { + 'hit': result_row['hit'], + 'status': "existing", + 'request': request, + 'grobid': existing_grobid, + 'file_meta': existing_file_meta, + 'cdx': existing_cdx, + 'terminal': { + 'terminal_url': result_row['terminal_url'], + 'terminal_dt': result_row['terminal_dt'], + 'terminal_status_code': result_row['terminal_status_code'], + 'terminal_sha1hex': result_row['terminal_sha1hex'], + }, + } + return result + + def process_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict: + """ + Run all the necessary processing for a new/fresh ingest hit. + """ + if ingest_type == "pdf": + return { + 'grobid': self.process_grobid(resource, file_meta), + 'pdf_meta': self.process_pdfextract(resource, file_meta), + } + elif ingest_type == "xml": + return { + 'xml_meta': self.process_xml(resource, file_meta), + } + elif ingest_type == "html": + html_info = self.process_html(resource, file_meta) + # if there is no html_biblio, don't clobber anything possibly extracted earlier + if 'html_biblio' in html_info and not html_info['html_biblio']: + html_info.pop('html_biblio') + return html_info + elif ingest_type == "src": + return {} + elif ingest_type == "component": + return {} + else: + raise NotImplementedError(f"process {ingest_type} hit") + + def process_grobid(self, resource: ResourceResult, file_meta: dict) -> dict: + """ + Submits to resource body to GROBID for processing. + + TODO: By default checks sandcrawler-db for an existing row first, then + decide if we should re-process + """ + if self.try_existing_grobid: + existing = self.pgrest_client.get_grobid(file_meta['sha1hex']) + if existing: + print("found existing GROBID result", file=sys.stderr) + return existing + + # Need to actually processes + result = self.grobid_client.process_fulltext(resource.body) + if self.grobid_sink: + # extra fields for GROBID kafka messages + result['file_meta'] = file_meta + result['key'] = result['file_meta']['sha1hex'] + self.grobid_sink.push_record(result.copy()) + if result['status'] == "success": + metadata = self.grobid_client.metadata(result) + if metadata: + result['metadata'] = self.grobid_client.metadata(result) + result['fatcat_release'] = result['metadata'].pop('fatcat_release', None) + result['grobid_version'] = result['metadata'].pop('grobid_version', None) + result.pop('tei_xml', None) + result.pop('file_meta', None) + result.pop('key', None) + return result + + def process_pdfextract(self, resource: ResourceResult, file_meta: dict) -> dict: + """ + Extracts thumbnail and pdf_meta info from PDF. + + By default checks sandcrawler-db for an existing row first, then decide + if we should re-process. + + TODO: difference between Kafka schema and SQL/postgrest schema + """ + if self.try_existing_pdfextract: + existing = self.pgrest_client.get_pdf_meta(file_meta['sha1hex']) + if existing: + print("found existing pdf_meta result", file=sys.stderr) + result = PdfExtractResult.from_pdf_meta_dict(existing) + return result.to_pdftext_dict() + + # Need to actually processes + result = process_pdf(resource.body) + assert result.file_meta['sha1hex'] == file_meta['sha1hex'] + if self.thumbnail_sink and result.page0_thumbnail is not None: + self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) + if self.pdftext_sink: + self.pdftext_sink.push_record(result.to_pdftext_dict(), key=result.sha1hex) + result.page0_thumbnail = None + result.text = None + result.file_meta = None + return result.to_pdftext_dict() + + def process_xml(self, resource: ResourceResult, file_meta: dict) -> dict: + """ + Simply publishes to Kafka topic. + + In the future, could extract other metadata here (like body word + count), or attempting to fetch sub-resources. + """ + if self.xmldoc_sink and file_meta['mimetype'] == "application/jats+xml": + try: + jats_xml = xml_reserialize(resource.body) + except xml.etree.ElementTree.ParseError: + return dict(status="xml-parse-error") + msg = dict( + sha1hex=file_meta["sha1hex"], + status="success", + jats_xml=jats_xml, + ) + self.xmldoc_sink.push_record(msg, key=file_meta['sha1hex']) + return dict(status="success") + + def process_html(self, resource: ResourceResult, file_meta: dict) -> dict: + + assert resource.body + try: + html_doc = HTMLParser(resource.body) + except ValueError as ve: + return dict( + status="html-selectolax-error", + ) + html_biblio = html_extract_biblio(resource.terminal_url, html_doc) + assert html_biblio + html_body = html_extract_body_teixml(resource.body) + html_platform = html_guess_platform(resource.terminal_url, html_doc, html_biblio) + html_scope = html_guess_scope(resource.terminal_url, html_doc, html_biblio, html_body.get('word_count')) + html_biblio_dict = json.loads(html_biblio.json(exclude_none=True)) + + if html_scope in ('blocked-captcha','blocked-cookie','blocked-forbidden'): + return dict( + status=html_scope, + html_biblio=html_biblio_dict, + scope=html_scope, + platform=html_platform, + ) + elif html_scope not in ('article-fulltext','unknown',): + html_body.pop("tei_xml", None) + return dict( + status="wrong-scope", + html_biblio=html_biblio_dict, + scope=html_scope, + platform=html_platform, + html_body=html_body, + ) + + raw_resources = html_extract_resources(resource.terminal_url, html_doc, self.adblock_rules) + if len(raw_resources) > self.max_html_resources: + html_body.pop("tei_xml", None) + return dict( + status="too-many-resources", + html_biblio=html_biblio_dict, + scope=html_scope, + platform=html_platform, + html_body=html_body, + ) + + if self.htmlteixml_sink and html_body['status'] == "success": + self.htmlteixml_sink.push_record(html_body, key=file_meta['sha1hex']) + + html_body.pop("tei_xml", None) + + partial_result = dict( + html_biblio=html_biblio_dict, + scope=html_scope, + platform=html_platform, + html_body=html_body, + ) + + when = parse_cdx_datetime(resource.cdx.datetime) + full_resources: List[WebResource] = [] + + try: + if self.html_quick_mode: + print(" WARN: running quick CDX-only fetches", file=sys.stderr) + full_resources = quick_fetch_html_resources(raw_resources, self.wayback_client.cdx_client, when) + else: + full_resources = fetch_html_resources(raw_resources, self.wayback_client, when) + except PetaboxError as e: + partial_result['status'] = 'petabox-error' + partial_result['error_message'] = str(e)[:1600] + return partial_result + except CdxApiError as e: + partial_result['status'] = 'cdx-error' + partial_result['error_message'] = str(e)[:1600] + return partial_result + except WaybackError as e: + partial_result['status'] = 'wayback-error' + partial_result['error_message'] = str(e)[:1600] + return partial_result + except WaybackContentError as e: + partial_result['status'] = 'wayback-content-error' + partial_result['error_message'] = str(e)[:1600] + return partial_result + except NoCaptureError as e: + partial_result['status'] = 'html-resource-no-capture' + partial_result['error_message'] = str(e)[:1600] + return partial_result + + info = dict( + html_body=html_body, + html_biblio=html_biblio_dict, + scope=html_scope, + platform=html_platform, + html_resources=[json.loads(r.json(exclude_none=True)) for r in full_resources], + ) + if html_scope == 'unknown': + info['status'] = 'unknown-scope' + return info + + def timeout_response(self, task: dict) -> dict: + print("[TIMEOUT]", file=sys.stderr) + return dict( + request=task, + hit=False, + status="timeout", + error_message="ingest worker internal timeout", + ) + + def want(self, request: dict) -> bool: + if not request.get('ingest_type') in ('file', 'pdf', 'xml', 'html', 'src', 'component'): + return False + return True + + def process(self, request: dict, key: Any = None) -> dict: + + # old backwards compatibility + if request.get('ingest_type') == 'file': + request['ingest_type'] = 'pdf' + + ingest_type = request.get('ingest_type') + if ingest_type not in ("pdf", "xml", "html", "src", "component"): + raise NotImplementedError(f"can't handle ingest_type={ingest_type}") + + # parse/clean URL + # note that we pass through the original/raw URL, and that is what gets + # persisted in database table + base_url = clean_url(request['base_url']) + + force_recrawl = bool(request.get('force_recrawl', False)) + + for block in self.base_url_blocklist: + if block in base_url: + print("[SKIP {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) + return dict(request=request, hit=False, status="skip-url-blocklist") + + print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) + + best_mimetype = None + if ingest_type == "pdf": + best_mimetype = "application/pdf" + elif ingest_type == "xml": + best_mimetype = "text/xml" + elif ingest_type == "html": + best_mimetype = "text/html" + elif ingest_type == "src": + best_mimetype = "application/gzip" + + existing = self.check_existing_ingest(ingest_type, base_url) + if existing: + return self.process_existing(request, existing) + + result: Dict[str, Any] = dict(request=request, hit=False) + + next_url = base_url + hops = [base_url] + + while len(hops) <= self.max_hops: + + result['hops'] = hops + + # check against blocklist again on each hop + for block in self.base_url_blocklist: + if block in next_url: + result['status'] = "skip-url-blocklist" + return result + + # check against known loginwall URLs + for block in self.wall_blocklist: + if block in next_url: + # TODO: blocked-wall instead of skip-wall + result['status'] = "skip-wall" + return result + + # check for popular cookie blocking URL patterns. On successful SPN + # crawls, shouldn't see these redirect URLs + for pattern in self.cookie_blocklist: + if pattern in next_url: + result['status'] = 'blocked-cookie' + return result + + try: + resource = self.find_resource(next_url, best_mimetype, force_recrawl=force_recrawl) + except SavePageNowError as e: + result['status'] = 'spn2-error' + result['error_message'] = str(e)[:1600] + return result + except PetaboxError as e: + result['status'] = 'petabox-error' + result['error_message'] = str(e)[:1600] + return result + except CdxApiError as e: + result['status'] = 'cdx-error' + result['error_message'] = str(e)[:1600] + # add a sleep in cdx-error path as a slow-down + time.sleep(2.0) + return result + except WaybackError as e: + result['status'] = 'wayback-error' + result['error_message'] = str(e)[:1600] + return result + except WaybackContentError as e: + result['status'] = 'wayback-content-error' + result['error_message'] = str(e)[:1600] + return result + except NotImplementedError as e: + result['status'] = 'not-implemented' + result['error_message'] = str(e)[:1600] + return result + + assert resource + + if resource.terminal_url: + result['terminal'] = { + "terminal_url": resource.terminal_url, + "terminal_dt": resource.terminal_dt, + "terminal_status_code": resource.terminal_status_code, + } + if resource.terminal_url not in result['hops']: + result['hops'].append(resource.terminal_url) + + if not resource.hit: + result['status'] = resource.status + return result + + if resource.terminal_url: + for pattern in self.base_url_blocklist: + if pattern in resource.terminal_url: + result['status'] = 'skip-url-blocklist' + return result + + if resource.terminal_url: + for pattern in self.cookie_blocklist: + if pattern in resource.terminal_url: + result['status'] = 'blocked-cookie' + return result + + if not resource.body: + result['status'] = 'null-body' + return result + + if len(resource.body) > MAX_BODY_SIZE_BYTES: + result['status'] = 'body-too-large' + return result + + file_meta = gen_file_metadata(resource.body) + try: + file_meta, resource = fix_transfer_encoding(file_meta, resource) + except Exception as e: + result['status'] = 'bad-gzip-encoding' + result['error_message'] = str(e) + return result + + if not resource.body or file_meta['size_bytes'] == 0: + result['status'] = 'null-body' + return result + + # here we split based on ingest type to try and extract a next hop + html_ish_resource = bool( + "html" in file_meta['mimetype'] + or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml" + or "application/xml" in file_meta['mimetype'] + or "text/xml" in file_meta['mimetype'] + ) + html_biblio = None + html_doc = None + if html_ish_resource and resource.body: + try: + html_doc = HTMLParser(resource.body) + html_biblio = html_extract_biblio(resource.terminal_url, html_doc) + if html_biblio: + if not 'html_biblio' in result or html_biblio.title: + result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True)) + #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr) + except ValueError: + pass + + if ingest_type == "pdf" and html_ish_resource: + + # the new style of URL extraction (already computed) + if html_biblio and html_biblio.pdf_fulltext_url: + fulltext_url = dict( + pdf_url=html_biblio.pdf_fulltext_url, + technique="html_biblio", + ) + else: + fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body) + + result['extract_next_hop'] = fulltext_url + if not fulltext_url: + result['status'] = 'no-pdf-link' + return result + next_url = fulltext_url.get('pdf_url') or fulltext_url.get('next_url') or "" + assert next_url + next_url = clean_url(next_url) + print("[PARSE {:>6}] {} {}".format( + ingest_type, + fulltext_url.get('technique'), + next_url, + ), + file=sys.stderr) + if next_url in hops: + result['status'] = 'link-loop' + result['error_message'] = "repeated: {}".format(next_url) + return result + hops.append(next_url) + continue + elif ingest_type in ("xml", "html", "component") and html_ish_resource and html_biblio: + # NOTE: src_fulltext_url is not a thing + next_url_found = None + if ingest_type == "xml" and html_biblio.xml_fulltext_url: + next_url_found = html_biblio.xml_fulltext_url + elif ingest_type == "html" and html_biblio.html_fulltext_url: + next_url_found = html_biblio.html_fulltext_url + elif ingest_type == "component" and html_biblio.component_url: + next_url_found = html_biblio.component_url + + if next_url_found: + next_url = next_url_found + technique = "html_biblio" + print("[PARSE {:>6}] {} {}".format( + ingest_type, + technique, + next_url, + ), + file=sys.stderr) + if next_url in hops: + if ingest_type == "html": + # for HTML ingest, we don't count this as a link-loop + break + result['status'] = 'link-loop' + result['error_message'] = "repeated: {}".format(next_url) + return result + hops.append(next_url) + continue + + # default is to NOT keep hopping + break + + if len(hops) >= self.max_hops: + result['status'] = "max-hops-exceeded" + return result + + # fetch must be a hit if we got this far (though not necessarily an ingest hit!) + assert resource + assert resource.hit == True + assert resource.terminal_status_code in (200, 226) + + if resource.terminal_url: + result['terminal'] = { + "terminal_url": resource.terminal_url, + "terminal_dt": resource.terminal_dt, + "terminal_status_code": resource.terminal_status_code, + "terminal_sha1hex": file_meta['sha1hex'], + } + + result['file_meta'] = file_meta + result['cdx'] = cdx_to_dict(resource.cdx) + if resource.revisit_cdx: + result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx) + + if ingest_type == "pdf": + if file_meta['mimetype'] != "application/pdf": + result['status'] = "wrong-mimetype" # formerly: "other-mimetype" + return result + elif ingest_type == "xml": + if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"): + result['status'] = "wrong-mimetype" + return result + elif ingest_type == "html": + if file_meta['mimetype'] not in ("text/html", "application/xhtml+xml"): + result['status'] = "wrong-mimetype" + return result + elif ingest_type == "src": + if file_meta['mimetype'] not in self.src_valid_mimetypes: + result['status'] = "wrong-mimetype" + return result + elif ingest_type == "component": + if file_meta['mimetype'] not in self.component_valid_mimetypes: + result['status'] = "wrong-mimetype" + return result + else: + raise NotImplementedError() + + info = self.process_hit(ingest_type, resource, file_meta) + result.update(info) + + # check if processing turned up an error + if info.get('status') not in ('success', None): + result['status'] = info['status'] + return result + + result['status'] = "success" + result['hit'] = True + if ingest_type == "pdf": + print("[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format( + ingest_type, + result.get('file_meta', {}).get('sha1hex'), + result.get('grobid', {}).get('status_code'), + result.get('pdf_meta', {}).get('status'), + ), + file=sys.stderr) + else: + print("[SUCCESS {:>5}] sha1:{}".format( + ingest_type, + result.get('file_meta', {}).get('sha1hex'), + ), + file=sys.stderr) + return result + + +class IngestFileRequestHandler(BaseHTTPRequestHandler): + def do_POST(self): + if self.path != "/ingest": + self.send_response(404) + self.end_headers() + self.wfile.write("404: Not Found") + return + length = int(self.headers.get('content-length')) + request = json.loads(self.rfile.read(length).decode('utf-8')) + print("Got request: {}".format(request)) + ingester = IngestFileWorker() + result = ingester.process(request) + self.send_response(200) + self.end_headers() + self.wfile.write(json.dumps(result)) diff --git a/python/sandcrawler/minio.py b/python/sandcrawler/minio.py new file mode 100644 index 0000000..c7deea1 --- /dev/null +++ b/python/sandcrawler/minio.py @@ -0,0 +1,99 @@ + +import io +import os +import hashlib + +import minio + + +class SandcrawlerMinioClient(object): + + def __init__(self, host_url, access_key, secret_key, default_bucket=None): + """ + host is minio connection string (host:port) + access and secret key are as expected + default_bucket can be supplied so that it doesn't need to be repeated for each function call + + Example config: + + host="localhost:9000", + access_key=os.environ['SANDCRAWLER_BLOB_ACCESS_KEY'], + secret_key=os.environ['SANDCRAWLER_BLOB_ACCESS_KEY'], + """ + self.mc = minio.Minio( + host_url, + access_key=access_key, + secret_key=secret_key, + secure=False, + ) + self.default_bucket = default_bucket + + def _blob_path(self, folder, sha1hex: str, extension: str, prefix): + if not extension: + extension = "" + if not prefix: + prefix = "" + assert len(sha1hex) == 40 + obj_path = "{}{}/{}/{}/{}{}".format( + prefix, + folder, + sha1hex[0:2], + sha1hex[2:4], + sha1hex, + extension, + ) + return obj_path + + def put_blob(self, folder, blob, sha1hex=None, extension="", prefix="", bucket=None): + """ + blob should be bytes + sha1hex is assumed to be sha1 of the blob itself; if not supplied it will be calculated + Uploads blob to path in the given bucket. Files are stored in a top-level + folder, then in two levels of sub-directory based on sha1, then the + filename is SHA1 with an optional file extension. + """ + if type(blob) == str: + blob = blob.encode('utf-8') + assert type(blob) == bytes + if not sha1hex: + h = hashlib.sha1() + h.update(blob) + sha1hex = h.hexdigest() + obj_path = self._blob_path(folder, sha1hex, extension, prefix) + if not bucket: + bucket = self.default_bucket + assert bucket + content_type = "application/octet-stream" + if extension.endswith('.xml'): + content_type = "application/xml" + if extension.endswith('.png'): + content_type = "image/png" + elif extension.endswith('.jpg') or extension.endswith('.jpeg'): + content_type = "image/jpeg" + elif extension.endswith('.txt'): + content_type = "text/plain" + self.mc.put_object( + bucket, + obj_path, + io.BytesIO(blob), + len(blob), + content_type=content_type, + ) + return (bucket, obj_path) + + def get_blob(self, folder, sha1hex, extension="", prefix="", bucket=None): + """ + sha1hex is sha1 of the blob itself + + Fetched blob from the given bucket/folder, using the sandcrawler SHA1 path convention + """ + obj_path = self._blob_path(folder, sha1hex, extension, prefix) + if not bucket: + bucket = self.default_bucket + assert bucket + blob = self.mc.get_object( + bucket, + obj_path, + ) + # TODO: optionally verify SHA-1? + return blob diff --git a/python/sandcrawler/misc.py b/python/sandcrawler/misc.py new file mode 100644 index 0000000..a3e2960 --- /dev/null +++ b/python/sandcrawler/misc.py @@ -0,0 +1,222 @@ + +import base64 +import magic +import hashlib +import datetime +from typing import Optional + +import requests +from requests.adapters import HTTPAdapter +from requests.packages.urllib3.util.retry import Retry # pylint: disable=import-error +import urlcanon + + +def clean_url(s: str) -> str: + s = s.strip() + parsed = urlcanon.parse_url(s) + if not parsed.port and parsed.colon_before_port: + parsed.colon_before_port = b'' + return str(urlcanon.whatwg(parsed)) + +def url_fuzzy_equal(left: str, right: str) -> bool: + """ + TODO: use proper surt library and canonicalization for this check + """ + fuzzy_left = '://'.join(clean_url(left).replace('www.', '').replace(':80/', '/').split('://')[1:]) + fuzzy_right = '://'.join(clean_url(right).replace('www.', '').replace(':80/', '/').split('://')[1:]) + if fuzzy_left == fuzzy_right: + return True + elif fuzzy_left == fuzzy_right + "/" or fuzzy_right == fuzzy_left + "/": + return True + return False + +def test_url_fuzzy_equal() -> None: + assert True == url_fuzzy_equal( + "http://www.annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree", + "http://annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree") + +def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict: + """ + Takes a file blob (bytestream) and returns hashes and other metadata. + + Returns a dict: size_bytes, md5hex, sha1hex, sha256hex, mimetype + """ + assert blob is not None + if not allow_empty: + assert blob + mimetype = magic.Magic(mime=True).from_buffer(blob) + if mimetype in ("application/xml", "text/xml"): + # crude checks for XHTML or JATS XML, using only first 1 kB of file + if b"<htm" in blob[:1024] and b'xmlns="http://www.w3.org/1999/xhtml"' in blob[:1024]: + mimetype = "application/xhtml+xml" + elif b"<article " in blob[:1024] and not b"<html" in blob[:1024]: + mimetype = "application/jats+xml" + hashes = [ + hashlib.sha1(), + hashlib.sha256(), + hashlib.md5(), + ] + for h in hashes: + h.update(blob) + return dict( + size_bytes=len(blob), + sha1hex=hashes[0].hexdigest(), + sha256hex=hashes[1].hexdigest(), + md5hex=hashes[2].hexdigest(), + mimetype=mimetype, + ) + +def b32_hex(s: str) -> str: + """ + Converts a base32-encoded SHA-1 checksum into hex-encoded + + base32 checksums are used by, eg, heritrix and in wayback CDX files + """ + s = s.strip().split()[0].lower() + if s.startswith("sha1:"): + s = s[5:] + if len(s) != 32: + if len(s) == 40: + return s + raise ValueError("not a base-32 encoded SHA-1 hash: {}".format(s)) + return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') + +NORMAL_MIME = ( + 'application/pdf', + 'application/postscript', + 'text/html', + 'text/xml', + 'application/octet-stream', +) + +def normalize_mime(raw: str) -> Optional[str]: + raw = raw.lower().strip() + for norm in NORMAL_MIME: + if raw.startswith(norm): + return norm + + # Special cases + if raw.startswith('application/xml'): + return 'text/xml' + if raw.startswith('application/x-pdf'): + return 'application/pdf' + if raw in ( + '.pdf', + ): + return 'application/pdf' + if raw in ( + 'application/download', + 'binary/octet-stream', + 'unk', + 'application/x-download', + 'application/octetstream', + 'application/force-download', + 'application/unknown', + ): + return 'application/octet-stream' + return None + + +def test_normalize_mime(): + assert normalize_mime("asdf") is None + assert normalize_mime("application/pdf") == "application/pdf" + assert normalize_mime("application/pdf+journal") == "application/pdf" + assert normalize_mime("Application/PDF") == "application/pdf" + assert normalize_mime("application/p") is None + assert normalize_mime("application/xml+stuff") == "text/xml" + assert normalize_mime("application/x-pdf") == "application/pdf" + assert normalize_mime("application/x-html") is None + assert normalize_mime("unk") == "application/octet-stream" + assert normalize_mime("binary/octet-stream") == "application/octet-stream" + + +def parse_cdx_line(raw_cdx: str, normalize=True) -> Optional[dict]: + """ + This method always filters a few things out: + + - non-HTTP requests, based on lack of status code (eg, whois) + """ + + cdx = raw_cdx.split() + if len(cdx) < 11: + return None + + surt = cdx[0] + dt = cdx[1] + url = cdx[2] + mime = normalize_mime(cdx[3]) + http_status = cdx[4] + sha1b32 = cdx[5] + c_size = cdx[8] + offset = cdx[9] + warc = cdx[10] + + if not (sha1b32.isalnum() and c_size.isdigit() and offset.isdigit() + and len(sha1b32) == 32 and dt.isdigit()): + return None + + if '-' in (surt, dt, url, http_status, sha1b32, c_size, offset, warc): + return None + + if mime is None or mime == '-': + mime = "application/octet-stream" + + if normalize: + mime = normalize_mime(mime) + + sha1hex = b32_hex(sha1b32) + + return dict( + surt=surt, + url=url, + datetime=dt, + mimetype=mime, + http_status=int(http_status), + sha1b32=sha1b32, + sha1hex=sha1hex, + warc_csize=int(c_size), + warc_offset=int(offset), + warc_path=warc, + ) + +def parse_cdx_datetime(dt_str: str) -> Optional[datetime.datetime]: + if not dt_str: + return None + try: + return datetime.datetime.strptime(dt_str, "%Y%m%d%H%M%S") + except Exception: + return None + +def test_parse_cdx_datetime() -> None: + assert parse_cdx_datetime("") == None + assert parse_cdx_datetime("asdf") == None + assert parse_cdx_datetime("19930203123045") != None + assert parse_cdx_datetime("20201028235103") == datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3) + +def datetime_to_cdx(dt: datetime.datetime) -> str: + return '%04d%02d%02d%02d%02d%02d' % ( + dt.year, dt.month, dt.day, + dt.hour, dt.minute, dt.second, + ) + +def test_datetime_to_cdx() -> None: + assert "20201028235103" == datetime_to_cdx(datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3)) + +def requests_retry_session(retries=10, backoff_factor=3, + status_forcelist=(500, 502, 504), session=None) -> requests.Session: + """ + From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests + """ + session = session or requests.Session() + retry = Retry( + total=retries, + read=retries, + connect=retries, + backoff_factor=backoff_factor, + status_forcelist=status_forcelist, + ) + adapter = HTTPAdapter(max_retries=retry) + session.mount('http://', adapter) + session.mount('https://', adapter) + return session + diff --git a/python/sandcrawler/pdfextract.py b/python/sandcrawler/pdfextract.py new file mode 100644 index 0000000..9b4e834 --- /dev/null +++ b/python/sandcrawler/pdfextract.py @@ -0,0 +1,470 @@ + +import sys +import json +import datetime +from io import BytesIO +from dataclasses import dataclass +from typing import Optional, Dict, Any + +import poppler +from PIL import Image + +from .workers import SandcrawlerWorker, SandcrawlerFetchWorker +from .misc import gen_file_metadata + + +# This is a hack to work around timeouts when processing certain PDFs with +# poppler. For some reason, the usual Kafka timeout catcher isn't working on +# these, maybe due to threading. +BAD_PDF_SHA1HEX = [ + "011478a1e63a2a31eae1a93832a74cc95f220760", + "018dfe9824de6d2ac068ce0f7dc9961bffa1b558", + "057c7a9dfb611bfd52f7de6c39b2d5757c5e4e53", + "06061af0707298c12932516d1bb7c2b6dc443824", + "0641822e68c5a07538b967489fd19a1d5dc371a5", + "09cba9b00494d12759c50cb914f1fb7c9746f5d1", + "09db7c9f2efb496c974427a61e84292ae27fc702", + "0a1c13cb8783bbbf248b2345b9890e2410aa3f0a", + "0ccc6dc94f4e2d809fac8543870265c3421f3c9e", + "0d1c1567ea70e7b922ba88ccb868ffc7ca18e75c", + "10c6577a658bf6203557e2998b25ea9788f8adfe", + "15a720921ce30da983fcd1bfa7fe9aeeda503e41", + "1659881a31edc2d0e170f6bb26d32e74cc4ca387", + "17e679b0ec9444fff2ea4d02caec05dd2de80ec3", + "182749ad1db1d5e999d07f010bdcfc2978dadc88", + "1a17a4fc43397804830cc29021281aac2e8cf0cb", + "1cb166f0c0b5ffe673e6bbf6a29d77278711f253", + "1d04e46b6848e6479dd90fe26bb11627044fb664", + "1d967c95546d31edaaf0c3ef9ffcc11113a9e11a", + "1f90194bf0c7fff1fe1ed5fff77a934c7a1b32a0", + "20589d9dd0a22c8c938ad97b7f4f12648aa119fa", + "2195e528fa1cf5f8ae3b2adcc516896016c3411f", + "25ab9e6169f041be05844a9b4edd6574918af769", + "281de904c4642a9be4f17b9774fc0a2bdc8a90e3", + "2bd5322975653536550a039eb055174b2bf241b3", + "2fc64da736175810918fd32c94c5068b0d660bcc", + "32318fba9b05b2756b7362bcaa4722c92ed8d449", + "336833c6fc968cd0938250dfc93c032a30111cfc", + "362ad00bc24d650c8f11851f9e554fc560b73e7a", + "373f84dfab4ed47047826e604e2918a9cd6a95b2", + "3ac0b6e17e30d141871a0a5b127536919fe5aa19", + "3c8a6a708da0dc1802f5f3e5267a49b3c25e1ffe", + "3e5f9fb94e7314447a22f3d009419a922136177f", + "3fad493c940137ce703f2f570ebb504e360c6df3", + "40aa94602ab13e5a7d9df8c989fca4fa5c01239e", + "427479c94d7d0e512f898bc7ff0b6f210069f902", + "436c9183724f051b22c96285aa8ff1d2ba709574", + "43a8c0abf0386d3e3397cf5e22a884761dd63db7", + "445968ef735b228c08c3ff4238d99fc9f4824619", + "447fa6b5a90742a86429a932f6608d8e141688c0", + "45f014d7d631559dc7726e5c5513f1e7c91c48a9", + "47577ff6d6876117ca69bec60a5764f7d2c2ec70", + "4785181cec8944eee00ddb631a5dfc771b89bab7", + "47db2db2cc976429568841a0496c0ab4ed7b5977", + "481c0bae81873988fcc8662ba8a269e8823fdea2", + "4c81129904f7976a50825595a3497ea7b52579ef", + "4edc1402712fa6827c4501fed8042e9f4447829c", + "50b3c5a3122272aca69855ef06b85d0b43a76eb1", + "52fc9b3c5199ef395d410c7cee5961dc812e4d29", + "53471346019947a88c1ba141fb829375527153b0", + "58d9ae7dcb0a7dbbdfc58ad266030b037e9cd0ff", + "59cfc843ebdb1c1e5db1efc76a40f46cb3bb06f0", + "5ab98405b676ee81a6ca74fba51a9e4a6cff7311", + "5e04779cbbae5ce88bb786064f756885dd6895fe", + "5e6a3adde9f08c276c4efd72bfacb256f2ec35d9", + "623ff84b616383d0a3e0dd8dbce12f0b5fe9a6ac", + "646c4a654270606256397684204ff0f3d17be2e7", + "64d821d728f9a3dc944b4c03be00feea0b57e314", + "689b5cb3ddef213d612363a903f10d0358ea64d2", + "6909f0b62d8b7835de3dec7777aad7f8ef507ee3", + "74e617dc95555e8ca3aadd19d0c85b71cd77d1d9", + "75c2662a96ccc48891228df7c85eb7d4da9dd621", + "771f1ca0007a6fbed5b4a434c73f524f715d33c1", + "776859635e9dc01d97b0582f49c814ffbcb019fb", + "781dafda896a9f5c30f3d0a011f79a3b79b574c4", + "788672c7c2bcdecf6e2f6a2177c01e60f04d9cfb", + "79d6cba3c6e577a0f3a3a9fe575680d38454938d", + "7cfc0739be9c49d94272110a0a748256bdde9be6", + "7daf61526ec825151f384cc1db510ca5237d5d80", + "7e9d846f3bf9ce15cdb991b78cc870ab8a2bed76", + "8398b211a5ec4da1195a4ba1bc29ca8c0ac40f67", + "859d7ec532a0bf3b52b17c7f2d8ecc58410c0aad", + "88edcbab1cac2d70af5870422974afc253f4f0c6", + "89860fc475fcb2a2d86c4544df52ec8fd5e6533f", + "8dcaf4ef132900dd378f7be526c884b17452713b", + "8e4f03c29ae1fe7227140ab4b625f375f6c00d31", + "949dfb7d833da9576b2ccb9eb1ab5457469c53d3", + "961ec451172f373f919c593737466300e42062cb", + "976989fa6e447578d9ce16ec5b526f0e09d6df50", + "98b02eb70066c182c705ef4d14d8b723ad7f1fab", + "993ca31f6974f8387bb18dd7d38987d290da8781", + "9dbd05af3442e6f42d67868054751b76973f4171", + "a2298c137b9c8c8975bad62eea9224edb95e6952", + "a2671738755ab8b24775e95375dc72f1ca4e5fd6", + "a26f299fb97c646effeebd4c5e2968786bd0f781", + "a48f9b7ad627909f76d780aa4208530304ece42c", + "a69665d0b5d3b95f54f68406eee3ed50c67efb45", + "a69665d0b5d3b95f54f68406eee3ed50c67efb45", + "a8357c31837404f9ebd798999d546c9398ab3648", + "a9162b9aef5e5da0897275fede1a6cff8cc93dfc", + "ad038725bf6855a79f3c768ebe93c7103d14522f", + "aef581bf42e76e527f5aed3b8958fd4e7a24819f", + "b2b66b9c7f817a20144456f99c0be805602e8597", + "b2d719120306b90eb8dd3580b699a61ec70556f4", + "b4b8e18e27f102e59b2be2d58c7b54d0a0eb457a", + "b5be7f409a3a2601208c5ce08cf52b9ac1094aae", + "b5bf8b7467fb095c90adf3b49aa1687291e4469c", + "b8b427e5b3d650ba9e03197f9c3917e25b878930", + "bad48b89b639b5b7df2c6a2d5288181fcb8b0e35", + "be0cda7642e9247b3ee41cd2017fa709aab4f344", + "c1b583fbd052572f08158d39ffe4d7510dadbebb", + "c2526f75a013dc67b14ce1e2d0e4fc80bb93c6e1", + "c4abbb284f4acaca9e8ceb88f842901984e84d33", + "c7220d1bf1e71fb755d9f26bbdd4c539dc162960", + "c7687fa6f637c7d32a25be0e772867d87536d35c", + "c7d8b37ec99cf0d987e60667f05299f200e18a5d", + "c92b9ae9eefa07504950b405625aef54b48f0e1a", + "ccb1debcfae006a3fc984e9e91309b9706a5c375", + "cd611c765cbb0b3b7cb2fdc07d8f0b9cc93ec257", + "cd8a7c3b8d850ebedc1ca791ccb37b9a2689f9c3", + "d055c054c330f99ec011e37186d2b429339758fd", + "d17b1e254cce82df5c6eb4fd492cef91e7e11558", + "d188762a7e3ab5d4ee8a897204316513e4e636ec", + "d613b9e4442f5d5d19ea6814fa9729bff7da7c85", + "d6b0f405bf13c23d0e90c54eea527442786d1cd3", + "da2211ee2dbc6dda36571976d810e2366a3d2504", + "e01bb7256d77aea258313bb410dfcfc10512f420", + "e2bf5d0a5885359381fe8ef2cd9290171d494e9b", + "e2c3b8a2cf33d5e8972bc9ddb78373766a75e412", + "e64714a81f60ab9286ec90cad682cb22e564fb6f", + "e9d7716b4f94bbc3d94459b5fe9bb8b15cb2e433", + "e9e84e17383e93a784a8471708619162b32fb399", + "eac7df5f799983d5a7cc55d10b4d426dc557febf", + "eaf84b2efd2f69c7b3f407f89ea66ac4c41fac36", + "eb1b39fd7a874896688855a22efddef10272427c", + "eb5fffaa590a52bcc3705b888c6ff9c4dc4c45b2", + "edf8dcc8736f06afbaca0e01d60bd2c475403a3d", + "ee2ee6ae2cf05128810d0d95bbe69bd263e140de", + "ee9530a2c5a3d1e3813ccb51a55cc8b0d9b5dfc7", + "ef1dfa325c21cff4cd8bb1a9b6c4ee6996d43c8f", + "ef6749d9263a01f921ba7d72df0d17671d14e5f6", + "f0ea221d8587cede25592266486e119d277f7096", + "f68f9a9202a75d2aee35252e104d796f9515001e", + "f9314d3bf2eac78a7d78d18adcccdb35542054ef", + "fd9bd560662e070b222d63052830837829c490f0", +] + +@dataclass +class PdfExtractResult: + sha1hex: str + status: str + error_msg: Optional[str] = None + file_meta: Optional[Dict[str,Any]] = None + text: Optional[str] = None + page0_thumbnail: Optional[bytes] = None + has_page0_thumbnail: bool = False + meta_xml: Optional[str] = None + pdf_info: Optional[Dict[str,Any]] = None + pdf_extra: Optional[Dict[str,Any]] = None + source: Optional[Dict[str,Any]] = None + + def to_pdftext_dict(self) -> dict: + """ + Outputs a JSON string as would be published to Kafka text/info topic. + """ + return { + 'key': self.sha1hex, + 'sha1hex': self.sha1hex, + 'status': self.status, + 'file_meta': self.file_meta, + 'error_msg': self.error_msg, + 'text': self.text, + 'has_page0_thumbnail': self.has_page0_thumbnail, + 'meta_xml': self.meta_xml, + 'pdf_info': self.pdf_info, + 'pdf_extra': self.pdf_extra, + 'source': self.source, + } + + @classmethod + def from_pdftext_dict(cls, record): + """ + Outputs a JSON string as would be published to Kafka text/info topic. + """ + if record['status'] != 'success': + return PdfExtractResult( + sha1hex=record.get('sha1hex') or record['key'], + status=record['status'], + error_msg=record.get('error_msg'), + ) + else: + return PdfExtractResult( + sha1hex=record['sha1hex'], + status=record['status'], + file_meta=record.get('file_meta'), + text=record.get('text'), + has_page0_thumbnail=bool(record.get('has_page0_thumbnail', False)), + meta_xml=record.get('meta_xml'), + pdf_info=record.get('pdf_info'), + pdf_extra=record.get('pdf_extra'), + ) + + @classmethod + def from_pdf_meta_dict(cls, record): + """ + Parses what would be returned from postgrest + """ + if record['status'] != 'success': + return PdfExtractResult( + sha1hex=record['sha1hex'], + status=record['status'], + error_msg=(record.get('metadata') or {}).get('error_msg'), + ) + else: + pdf_extra = dict() + for k in ('page_count', 'page0_height', 'page0_width', 'permanent_id', 'pdf_version'): + if record.get(k): + pdf_extra[k] = record[k] + return PdfExtractResult( + sha1hex=record['sha1hex'], + status=record['status'], + has_page0_thumbnail=bool(record.get('has_page0_thumbnail', False)), + pdf_info=record.get('metadata'), + pdf_extra=pdf_extra, + ) + + def to_sql_tuple(self) -> tuple: + # pdf_meta (sha1hex, updated, status, page0_thumbnail, page_count, + # word_count, page0_height, page0_width, permanent_id, pdf_created, + # pdf_version, metadata) + word_count: Optional[int] = None + if self.text: + word_count = len(self.text.split()) + metadata: Optional[Dict] = None + pdf_extra = self.pdf_extra or dict() + pdf_created = None + # TODO: form, encrypted + if self.pdf_info: + metadata = dict() + for k in ('Title', 'Subject', 'Author', 'Creator', 'Producer', 'doi'): + if k in self.pdf_info: + metadata[k.lower()] = self.pdf_info[k] + if 'CreationDate' in self.pdf_info: + pdf_created = self.pdf_info['CreationDate'] + metadata_json: Optional[str] = None + if metadata: + metadata_json = json.dumps(metadata, sort_keys=True) + return ( + self.sha1hex, + datetime.datetime.now(), # updated + self.status, + self.has_page0_thumbnail, + pdf_extra.get('page_count'), + word_count, + pdf_extra.get('page0_height'), + pdf_extra.get('page0_width'), + pdf_extra.get('permanent_id'), + pdf_created, + pdf_extra.get('pdf_version'), + metadata_json, + ) + + +def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtractResult: + """ + A known issue is that output text is in "physical layout" mode, which means + columns will be side-by-side. We would prefer a single stream of tokens! + + Tried using page.text(layout_mode=poppler.TextLayout.raw_order_layout) + instead of the default mode (poppler.TextLayout.physical_layout), but that + didn't seem to work at all (returned empty strings). + """ + file_meta = gen_file_metadata(blob) + sha1hex = file_meta['sha1hex'] + if file_meta['mimetype'] != 'application/pdf': + return PdfExtractResult( + sha1hex=sha1hex, + status='not-pdf', + error_msg=f"mimetype is '{file_meta['mimetype']}'", + file_meta=file_meta, + ) + + if sha1hex in BAD_PDF_SHA1HEX: + return PdfExtractResult( + sha1hex=sha1hex, + status='bad-pdf', + error_msg=f"PDF known to cause processing issues", + file_meta=file_meta, + ) + + print(f"\tpoppler processing: {sha1hex}", file=sys.stderr) + try: + pdf = poppler.load_from_data(blob) + if pdf is None: + return PdfExtractResult( + sha1hex=sha1hex, + status='empty-pdf', + file_meta=file_meta, + has_page0_thumbnail=False, + ) + page0 = pdf.create_page(0) + if page0 is None: + return PdfExtractResult( + sha1hex=sha1hex, + status='empty-page0', + file_meta=file_meta, + ) + # this call sometimes fails an returns an AttributeError + page0rect = page0.page_rect() + except (AttributeError, poppler.document.LockedDocumentError) as e: + # may need to expand the set of exceptions caught here over time, but + # starting with a narrow set + return PdfExtractResult( + sha1hex=sha1hex, + status='parse-error', + error_msg=str(e), + file_meta=file_meta, + ) + + assert page0 is not None + page0_thumbnail: Optional[bytes] = None + renderer = poppler.PageRenderer() + try: + full_img = renderer.render_page(page0) + img = Image.frombuffer("RGBA", (full_img.width, full_img.height), full_img.data, 'raw', "BGRA", 0, 1) + img.thumbnail(thumb_size, Image.BICUBIC) + buf = BytesIO() + img.save(buf, thumb_type) + page0_thumbnail = buf.getvalue() + # assuming that very small images mean something went wrong + if page0_thumbnail is None or len(page0_thumbnail) < 50: + page0_thumbnail = None + except Exception as e: + print(str(e), file=sys.stderr) + page0_thumbnail = None + + try: + full_text = page0.text() + for n in range(1, pdf.pages): + pageN = pdf.create_page(n) + full_text += pageN.text() + except AttributeError as e: + return PdfExtractResult( + sha1hex=sha1hex, + status='parse-error', + error_msg=str(e), + file_meta=file_meta, + ) + + # Kafka message size limit; cap at about 1 MByte + if len(full_text)> 1000000: + return PdfExtractResult( + sha1hex=sha1hex, + status='text-too-large', + error_msg="full_text chars: {}".format(len(full_text)), + file_meta=file_meta, + ) + if len(pdf.metadata)> 1000000: + return PdfExtractResult( + sha1hex=sha1hex, + status='text-too-large', + error_msg="meta_xml chars: {}".format(len(full_text)), + file_meta=file_meta, + ) + + try: + pdf_info = pdf.infos() + except UnicodeDecodeError: + return PdfExtractResult( + sha1hex=sha1hex, + status='bad-unicode', + error_msg="in infos()", + file_meta=file_meta, + ) + + # TODO: is this actually needed? or does json marshalling work automatically? + for k in pdf_info.keys(): + if isinstance(pdf_info[k], datetime.datetime): + pdf_info[k] = datetime.datetime.isoformat(pdf_info[k]) + + permanent_id: Optional[str] = None + update_id: Optional[str] = None + try: + permanent_id = pdf.pdf_id.permanent_id + update_id = pdf.pdf_id.update_id + except TypeError: + pass + + return PdfExtractResult( + sha1hex=sha1hex, + file_meta=file_meta, + status='success', + error_msg=None, + text=full_text or None, + has_page0_thumbnail=page0_thumbnail is not None, + page0_thumbnail=page0_thumbnail, + meta_xml=pdf.metadata or None, + pdf_info=pdf_info, + pdf_extra=dict( + page0_height=page0rect.height, + page0_width=page0rect.width, + page_count=pdf.pages, + permanent_id=permanent_id, + update_id=update_id, + pdf_version=f"{pdf.pdf_version[0]}.{pdf.pdf_version[1]}", + ), + ) + +class PdfExtractWorker(SandcrawlerFetchWorker): + + def __init__(self, wayback_client=None, sink=None, **kwargs): + super().__init__(wayback_client=wayback_client) + self.wayback_client = wayback_client + self.sink = sink + self.thumbnail_sink = kwargs.get('thumbnail_sink') + + def timeout_response(self, task) -> Dict: + default_key = task['sha1hex'] + return dict( + status="error-timeout", + error_msg="internal pdf-extract worker timeout", + source=task, + sha1hex=default_key, + ) + + def process(self, record, key: Optional[str] = None): + default_key = record['sha1hex'] + + fetch_result = self.fetch_blob(record) + if fetch_result['status'] != 'success': + return fetch_result + blob = fetch_result['blob'] + + result = process_pdf(blob) + result.source = record + if self.thumbnail_sink and result.page0_thumbnail is not None: + self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) + return result.to_pdftext_dict() + +class PdfExtractBlobWorker(SandcrawlerWorker): + """ + This is sort of like PdfExtractWorker, except it receives blobs directly, + instead of fetching blobs from some remote store. + """ + + def __init__(self, sink=None, **kwargs): + super().__init__() + self.sink = sink + self.thumbnail_sink = kwargs.get('thumbnail_sink') + + def process(self, blob, key: Optional[str] = None): + if not blob: + return None + assert isinstance(blob, bytes) + + result = process_pdf(blob) + if self.thumbnail_sink and result.page0_thumbnail is not None: + self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) + + return result.to_pdftext_dict() + diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py new file mode 100644 index 0000000..161dc9c --- /dev/null +++ b/python/sandcrawler/pdftrio.py @@ -0,0 +1,130 @@ + +import time +import requests + +from .workers import SandcrawlerWorker, SandcrawlerFetchWorker +from .misc import gen_file_metadata, requests_retry_session + + +class PdfTrioClient(object): + + def __init__(self, host_url="http://pdftrio.qa.fatcat.wiki", **kwargs): + self.host_url = host_url + self.http_session = requests_retry_session(retries=3, backoff_factor=3) + + def classify_pdf(self, blob, mode="auto"): + """ + Returns a dict with at least: + + - status_code (int, always set) + - status (success, or error-*) + + On success, the other remote API JSON response keys are also included. + + On HTTP-level failures, the status_code and status field are set + appropriately; an optional `error_msg` may also be set. For some other + errors, like connection failure, an exception is raised. + """ + assert blob + + try: + pdftrio_response = requests.post( + self.host_url + "/classify/research-pub/" + mode, + files={ + 'pdf_content': blob, + }, + timeout=60.0, + ) + except requests.Timeout: + return { + 'status': 'error-timeout', + 'status_code': -4, # heritrix3 "HTTP timeout" code + 'error_msg': 'pdftrio request (HTTP POST) timeout', + } + except requests.exceptions.ConnectionError: + # crude back-off + time.sleep(2.0) + return { + 'status': 'error-connect', + 'status_code': -2, # heritrix3 "HTTP connect" code + 'error_msg': 'pdftrio request connection timout', + } + + info = dict( + status_code=pdftrio_response.status_code, + ) + if pdftrio_response.status_code == 200: + resp_json = pdftrio_response.json() + assert 'ensemble_score' in resp_json + assert 'status' in resp_json + assert 'versions' in resp_json + info.update(resp_json) + else: + info['status'] = 'error' + # TODO: might return JSON with some info? + + info['_total_sec'] = pdftrio_response.elapsed.total_seconds() + return info + + +class PdfTrioWorker(SandcrawlerFetchWorker): + """ + This class is basically copied directly from GrobidWorker + """ + + def __init__(self, pdftrio_client, wayback_client=None, sink=None, **kwargs): + super().__init__(wayback_client=wayback_client) + self.pdftrio_client = pdftrio_client + self.sink = sink + + def process(self, record, key=None): + start_process = time.time() + default_key = record['sha1hex'] + fetch_sec = None + + start = time.time() + fetch_result = self.fetch_blob(record) + fetch_sec = time.time() - start + if fetch_result['status'] != 'success': + return fetch_result + blob = fetch_result['blob'] + + result = dict() + result['file_meta'] = gen_file_metadata(blob) + result['key'] = result['file_meta']['sha1hex'] + result['pdf_trio'] = self.pdftrio_client.classify_pdf(blob) + result['source'] = record + result['timing'] = dict( + pdftrio_sec=result['pdf_trio'].pop('_total_sec', None), + total_sec=time.time() - start_process, + ) + if fetch_sec: + result['timing']['fetch_sec'] = fetch_sec + return result + +class PdfTrioBlobWorker(SandcrawlerWorker): + """ + This is sort of like PdfTrioWorker, except it receives blobs directly, + instead of fetching blobs from some remote store. + """ + + def __init__(self, pdftrio_client, sink=None, mode="auto", **kwargs): + super().__init__() + self.pdftrio_client = pdftrio_client + self.sink = sink + self.mode = mode + + def process(self, blob, key=None): + start_process = time.time() + if not blob: + return None + result = dict() + result['file_meta'] = gen_file_metadata(blob) + result['key'] = result['file_meta']['sha1hex'] + result['pdf_trio'] = self.pdftrio_client.classify_pdf(blob, mode=self.mode) + result['timing'] = dict( + pdftrio_sec=result['pdf_trio'].pop('_total_sec', None), + total_sec=time.time() - start_process, + ) + return result + diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py new file mode 100644 index 0000000..a388b90 --- /dev/null +++ b/python/sandcrawler/persist.py @@ -0,0 +1,584 @@ + +""" +cdx +- read raw CDX, filter +- push to SQL table + +ingest-file-result +- read JSON format (batch) +- cdx SQL push batch (on conflict skip) +- file_meta SQL push batch (on conflict update) +- ingest request push batch (on conflict skip) +- ingest result push batch (on conflict update) + +grobid +- reads JSON format (batch) +- grobid2json +- minio push (one-by-one) +- grobid SQL push batch (on conflict update) +- file_meta SQL push batch (on conflict update) +""" + +import os +from typing import Optional, AnyStr +import xml.etree.ElementTree + +from sandcrawler.workers import SandcrawlerWorker +from sandcrawler.db import SandcrawlerPostgresClient +from sandcrawler.minio import SandcrawlerMinioClient +from sandcrawler.grobid import GrobidClient +from sandcrawler.pdfextract import PdfExtractResult +from sandcrawler.html_ingest import HtmlMetaRow + + +class PersistCdxWorker(SandcrawlerWorker): + + def __init__(self, db_url, **kwargs): + super().__init__() + self.db = SandcrawlerPostgresClient(db_url) + self.cur = self.db.conn.cursor() + + def process(self, record, key=None): + """ + Only do batches (as transactions) + """ + raise NotImplementedError + + def push_batch(self, batch): + self.counts['total'] += len(batch) + # filter to full CDX lines, no liveweb + cdx_batch = [r for r in batch if r.get('warc_path') and ("/" in r['warc_path'])] + resp = self.db.insert_cdx(self.cur, cdx_batch) + if len(cdx_batch) < len(batch): + self.counts['skip'] += len(batch) - len(cdx_batch) + self.counts['insert-cdx'] += resp[0] + self.counts['update-cdx'] += resp[1] + self.db.commit() + return [] + +class PersistIngestFileResultWorker(SandcrawlerWorker): + + def __init__(self, db_url, **kwargs): + super().__init__() + self.db = SandcrawlerPostgresClient(db_url) + self.cur = self.db.conn.cursor() + + def process(self, record, key=None): + """ + Only do batches (as transactions) + """ + raise NotImplementedError + + def request_to_row(self, raw): + """ + Converts ingest-request JSON schema (eg, from Kafka) to SQL ingest_request schema + + if there is a problem with conversion, return None + """ + # backwards compat hacks; transform request to look like current schema + if raw.get('ingest_type') == 'file': + raw['ingest_type'] = 'pdf' + if (not raw.get('link_source') + and raw.get('base_url') + and raw.get('ext_ids', {}).get('doi') + and raw['base_url'] == "https://doi.org/{}".format(raw['ext_ids']['doi'])): + # set link_source(_id) for old ingest requests + raw['link_source'] = 'doi' + raw['link_source_id'] = raw['ext_ids']['doi'] + if (not raw.get('link_source') + and raw.get('ingest_request_source', '').startswith('savepapernow') + and raw.get('fatcat', {}).get('release_ident')): + # set link_source(_id) for old ingest requests + raw['link_source'] = 'spn' + raw['link_source_id'] = raw['fatcat']['release_ident'] + + for k in ('ingest_type', 'base_url', 'link_source', 'link_source_id'): + if not k in raw: + self.counts['skip-request-fields'] += 1 + return None + if raw['ingest_type'] not in ('pdf', 'xml', 'html'): + self.counts['skip-ingest-type'] += 1 + return None + request = { + 'ingest_type': raw['ingest_type'], + 'base_url': raw['base_url'], + 'link_source': raw['link_source'], + 'link_source_id': raw['link_source_id'], + 'ingest_request_source': raw.get('ingest_request_source'), + 'request': {}, + } + # extra/optional fields + if raw.get('release_stage'): + request['release_stage'] = raw['release_stage'] + if raw.get('fatcat', {}).get('release_ident'): + request['request']['release_ident'] = raw['fatcat']['release_ident'] + for k in ('ext_ids', 'edit_extra', 'rel'): + if raw.get(k): + request['request'][k] = raw[k] + # if this dict is empty, trim it to save DB space + if not request['request']: + request['request'] = None + return request + + + def file_result_to_row(self, raw: dict) -> Optional[dict]: + """ + Converts ingest-result JSON schema (eg, from Kafka) to SQL ingest_file_result schema + + if there is a problem with conversion, return None and set skip count + """ + for k in ('request', 'hit', 'status'): + if not k in raw: + self.counts['skip-result-fields'] += 1 + return None + if not 'base_url' in raw['request']: + self.counts['skip-result-fields'] += 1 + return None + ingest_type = raw['request'].get('ingest_type') + if ingest_type == 'file': + ingest_type = 'pdf' + if ingest_type not in ('pdf', 'xml', 'html'): + self.counts['skip-ingest-type'] += 1 + return None + if raw['status'] in ("existing", ): + self.counts['skip-existing'] += 1 + return None + result = { + 'ingest_type': ingest_type, + 'base_url': raw['request']['base_url'], + 'hit': raw['hit'], + 'status': raw['status'], + } + terminal = raw.get('terminal') + if terminal: + result['terminal_url'] = terminal.get('terminal_url') or terminal.get('url') + result['terminal_dt'] = terminal.get('terminal_dt') + result['terminal_status_code'] = terminal.get('terminal_status_code') or terminal.get('status_code') or terminal.get('http_code') + if result['terminal_status_code']: + result['terminal_status_code'] = int(result['terminal_status_code']) + result['terminal_sha1hex'] = terminal.get('terminal_sha1hex') + if len(result['terminal_url']) > 2048: + # postgresql13 doesn't like extremely large URLs in b-tree index + self.counts['skip-huge-url'] += 1 + return None + return result + + def result_to_html_meta(self, record: dict) -> Optional[HtmlMetaRow]: + html_body = record.get('html_body') + file_meta = record.get('file_meta') + if not (file_meta and html_body): + return None + return HtmlMetaRow( + sha1hex=file_meta["sha1hex"], + status=record.get('status'), + scope=record.get('scope'), + has_teixml=bool(html_body and html_body['status'] == 'success'), + has_thumbnail=False, # TODO + word_count=(html_body and html_body.get('word_count')) or None, + biblio=record.get('html_biblio'), + resources=record.get('html_resources'), + ) + + def push_batch(self, batch): + self.counts['total'] += len(batch) + + if not batch: + return [] + + results = [self.file_result_to_row(raw) for raw in batch] + results = [r for r in results if r] + + requests = [self.request_to_row(raw['request']) for raw in batch if raw.get('request')] + requests = [r for r in requests if r] + + if requests: + resp = self.db.insert_ingest_request(self.cur, requests) + self.counts['insert-requests'] += resp[0] + self.counts['update-requests'] += resp[1] + if results: + resp = self.db.insert_ingest_file_result(self.cur, results, on_conflict="update") + self.counts['insert-results'] += resp[0] + self.counts['update-results'] += resp[1] + + # these schemas match, so can just pass through + cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx')] + revisit_cdx_batch = [r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx')] + cdx_batch.extend(revisit_cdx_batch) + # filter to full CDX lines, with full warc_paths (not liveweb) + cdx_batch = [r for r in cdx_batch if r.get('warc_path') and ("/" in r['warc_path'])] + if cdx_batch: + resp = self.db.insert_cdx(self.cur, cdx_batch) + self.counts['insert-cdx'] += resp[0] + self.counts['update-cdx'] += resp[1] + + file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')] + if file_meta_batch: + resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="nothing") + self.counts['insert-file_meta'] += resp[0] + self.counts['update-file_meta'] += resp[1] + + html_meta_batch = [self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body')] + if html_meta_batch: + resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="update") + self.counts['insert-html_meta'] += resp[0] + self.counts['update-html_meta'] += resp[1] + + self.db.commit() + return [] + +class PersistIngestRequestWorker(PersistIngestFileResultWorker): + + def __init__(self, db_url, **kwargs): + super().__init__(db_url=db_url) + + def process(self, record, key=None): + """ + Only do batches (as transactions) + """ + raise NotImplementedError + + def push_batch(self, batch): + self.counts['total'] += len(batch) + + if not batch: + return [] + + requests = [self.request_to_row(raw) for raw in batch] + requests = [r for r in requests if r] + + if requests: + resp = self.db.insert_ingest_request(self.cur, requests) + self.counts['insert-requests'] += resp[0] + self.counts['update-requests'] += resp[1] + + self.db.commit() + return [] + +class PersistGrobidWorker(SandcrawlerWorker): + + def __init__(self, db_url, **kwargs): + super().__init__() + self.grobid = GrobidClient() + self.s3 = SandcrawlerMinioClient( + host_url=kwargs.get('s3_url', 'localhost:9000'), + access_key=kwargs['s3_access_key'], + secret_key=kwargs['s3_secret_key'], + default_bucket=kwargs['s3_bucket'], + ) + self.s3_only = kwargs.get('s3_only', False) + self.db_only = kwargs.get('db_only', False) + assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed" + if not self.s3_only: + self.db = SandcrawlerPostgresClient(db_url) + self.cur = self.db.conn.cursor() + else: + self.db = None + self.cur = None + + def process(self, record, key=None): + """ + Only do batches (as transactions) + """ + raise NotImplementedError + + def push_batch(self, batch): + self.counts['total'] += len(batch) + + # filter out bad "missing status_code" timeout rows + missing = [r for r in batch if not r.get('status_code')] + if missing: + self.counts['skip-missing-status'] += len(missing) + batch = [r for r in batch if r.get('status_code')] + + for r in batch: + if r['status_code'] != 200 or not r.get('tei_xml'): + self.counts['s3-skip-status'] += 1 + if r.get('error_msg'): + r['metadata'] = {'error_msg': r['error_msg'][:500]} + continue + + assert len(r['key']) == 40 + if not self.db_only: + resp = self.s3.put_blob( + folder="grobid", + blob=r['tei_xml'], + sha1hex=r['key'], + extension=".tei.xml", + ) + self.counts['s3-put'] += 1 + + # enhance with teixml2json metadata, if available + try: + metadata = self.grobid.metadata(r) + except xml.etree.ElementTree.ParseError as xml_e: + r['status'] = 'bad-grobid-xml' + r['metadata'] = {'error_msg': str(xml_e)[:1024]} + continue + if not metadata: + continue + for k in ('fatcat_release', 'grobid_version'): + r[k] = metadata.pop(k, None) + if r.get('fatcat_release'): + r['fatcat_release'] = r['fatcat_release'].replace('release_', '') + if metadata.get('grobid_timestamp'): + r['updated'] = metadata['grobid_timestamp'] + r['metadata'] = metadata + + if not self.s3_only: + resp = self.db.insert_grobid(self.cur, batch, on_conflict="update") + self.counts['insert-grobid'] += resp[0] + self.counts['update-grobid'] += resp[1] + + file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')] + resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update") + self.counts['insert-file-meta'] += resp[0] + self.counts['update-file-meta'] += resp[1] + + self.db.commit() + + return [] + + +class PersistGrobidDiskWorker(SandcrawlerWorker): + """ + Writes blobs out to disk. + + This could be refactored into a "Sink" type with an even thinner wrapper. + """ + + def __init__(self, output_dir): + super().__init__() + self.output_dir = output_dir + + def _blob_path(self, sha1hex, extension=".tei.xml"): + obj_path = "{}/{}/{}{}".format( + sha1hex[0:2], + sha1hex[2:4], + sha1hex, + extension, + ) + return obj_path + + def process(self, record, key=None): + + if record.get('status_code') != 200 or not record.get('tei_xml'): + return False + assert(len(record['key'])) == 40 + p = "{}/{}".format(self.output_dir, self._blob_path(record['key'])) + os.makedirs(os.path.dirname(p), exist_ok=True) + with open(p, 'w') as f: + f.write(record.pop('tei_xml')) + self.counts['written'] += 1 + return record + + +class PersistPdfTrioWorker(SandcrawlerWorker): + + def __init__(self, db_url, **kwargs): + super().__init__() + self.db = SandcrawlerPostgresClient(db_url) + self.cur = self.db.conn.cursor() + + def process(self, record, key=None): + """ + Only do batches (as transactions) + """ + raise NotImplementedError + + def push_batch(self, batch): + self.counts['total'] += len(batch) + + batch = [r for r in batch if 'pdf_trio' in r and r['pdf_trio'].get('status_code')] + for r in batch: + # copy key (sha1hex) into sub-object + r['pdf_trio']['key'] = r['key'] + pdftrio_batch = [r['pdf_trio'] for r in batch] + resp = self.db.insert_pdftrio(self.cur, pdftrio_batch, on_conflict="update") + self.counts['insert-pdftrio'] += resp[0] + self.counts['update-pdftrio'] += resp[1] + + file_meta_batch = [r['file_meta'] for r in batch if r['pdf_trio']['status'] == "success" and r.get('file_meta')] + resp = self.db.insert_file_meta(self.cur, file_meta_batch) + self.counts['insert-file-meta'] += resp[0] + self.counts['update-file-meta'] += resp[1] + + self.db.commit() + return [] + + +class PersistPdfTextWorker(SandcrawlerWorker): + """ + Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL table. + + Should keep batch sizes small. + """ + + def __init__(self, db_url, **kwargs): + super().__init__() + self.s3 = SandcrawlerMinioClient( + host_url=kwargs.get('s3_url', 'localhost:9000'), + access_key=kwargs['s3_access_key'], + secret_key=kwargs['s3_secret_key'], + default_bucket=kwargs['s3_bucket'], + ) + self.s3_only = kwargs.get('s3_only', False) + self.db_only = kwargs.get('db_only', False) + assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed" + if not self.s3_only: + self.db = SandcrawlerPostgresClient(db_url) + self.cur = self.db.conn.cursor() + else: + self.db = None + self.cur = None + + def process(self, record, key=None): + """ + Only do batches (as transactions) + """ + raise NotImplementedError + + def push_batch(self, batch): + self.counts['total'] += len(batch) + + parsed_batch = [] + for r in batch: + parsed_batch.append(PdfExtractResult.from_pdftext_dict(r)) + + for r in parsed_batch: + if r.status != 'success' or not r.text: + self.counts['s3-skip-status'] += 1 + if r.error_msg: + r.metadata = {'error_msg': r.error_msg[:500]} + continue + + assert len(r.sha1hex) == 40 + if not self.db_only: + resp = self.s3.put_blob( + folder="text", + blob=r.text, + sha1hex=r.sha1hex, + extension=".txt", + ) + self.counts['s3-put'] += 1 + + if not self.s3_only: + resp = self.db.insert_pdf_meta(self.cur, parsed_batch, on_conflict="update") + self.counts['insert-pdf-meta'] += resp[0] + self.counts['update-pdf-meta'] += resp[1] + + file_meta_batch = [r.file_meta for r in parsed_batch if r.file_meta] + resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update") + self.counts['insert-file-meta'] += resp[0] + self.counts['update-file-meta'] += resp[1] + + self.db.commit() + + return [] + + +class PersistThumbnailWorker(SandcrawlerWorker): + """ + Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL + table. + + This worker *must* be used with raw kakfa mode; thumbnails are *not* + wrapped in JSON like most sandcrawler kafka messages. + """ + + def __init__(self, **kwargs): + super().__init__() + self.s3 = SandcrawlerMinioClient( + host_url=kwargs.get('s3_url', 'localhost:9000'), + access_key=kwargs['s3_access_key'], + secret_key=kwargs['s3_secret_key'], + default_bucket=kwargs['s3_bucket'], + ) + self.s3_extension = kwargs.get('s3_extension', ".jpg") + self.s3_folder = kwargs.get('s3_folder', "pdf") + + def process(self, blob: bytes, key: Optional[str] = None): + """ + Processing raw messages, not decoded JSON objects + """ + + if isinstance(key, bytes): + key = key.decode('utf-8') + assert key is not None and len(key) == 40 and isinstance(key, str) + assert isinstance(blob, bytes) + assert len(blob) >= 50 + + resp = self.s3.put_blob( + folder=self.s3_folder, + blob=blob, + sha1hex=key, + extension=self.s3_extension, + ) + self.counts['s3-put'] += 1 + + +class GenericPersistDocWorker(SandcrawlerWorker): + """ + Pushes blobs from Kafka to S3. + + Objects are assumed to be JSON-wrapped strings. + """ + + def __init__(self, **kwargs): + super().__init__() + self.s3 = SandcrawlerMinioClient( + host_url=kwargs.get('s3_url', 'localhost:9000'), + access_key=kwargs['s3_access_key'], + secret_key=kwargs['s3_secret_key'], + default_bucket=kwargs['s3_bucket'], + ) + self.s3_extension = kwargs.get('s3_extension', ".unknown") + self.s3_folder = kwargs.get('s3_folder', "unknown") + self.doc_key = "unknown" + + def process(self, record: dict, key: Optional[AnyStr] = None) -> None: + + if record.get('status') != 'success' or not record.get(self.doc_key): + return + + assert key is not None + if isinstance(key, bytes): + key_str = key.decode('utf-8') + elif isinstance(key, str): + key_str = key + assert len(key_str) == 40 + if 'sha1hex' in record: + assert key_str == record['sha1hex'] + + resp = self.s3.put_blob( + folder=self.s3_folder, + blob=record[self.doc_key].encode('utf-8'), + sha1hex=key_str, + extension=self.s3_extension, + ) + self.counts['s3-put'] += 1 + + +class PersistXmlDocWorker(GenericPersistDocWorker): + """ + Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to + sandcrawler database (SQL). + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.s3_extension = kwargs.get('s3_extension', ".jats.xml") + self.s3_folder = kwargs.get('s3_folder', "xml_doc") + self.doc_key = "jats_xml" + + +class PersistHtmlTeiXmlWorker(GenericPersistDocWorker): + """ + Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to + sandcrawler database (SQL). + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.s3_extension = kwargs.get('s3_extension', ".tei.xml") + self.s3_folder = kwargs.get('s3_folder', "html_body") + self.doc_key = "tei_xml" diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py new file mode 100644 index 0000000..37e3d7a --- /dev/null +++ b/python/sandcrawler/workers.py @@ -0,0 +1,625 @@ + +import sys +import json +import time +import signal +import zipfile +import requests +import multiprocessing.pool +from collections import Counter +from confluent_kafka import Consumer, Producer, KafkaException + +from .misc import parse_cdx_line +from .ia import SandcrawlerBackoffError, WaybackError, WaybackContentError, PetaboxError + + +class SandcrawlerWorker(object): + """ + Base class for sandcrawler workers. + + Usually these get "pushed" into by a RecordPusher. Output goes to another + worker (pipeline-style), or defaults to stdout. + """ + + def __init__(self): + self.counts = Counter() + self.sink = None + # TODO: self.counters + + def push_record(self, task, key=None): + self.counts['total'] += 1 + if not self.want(task): + self.counts['skip'] += 1 + return + result = self.process(task, key=key) + if not result: + self.counts['failed'] += 1 + return + elif type(result) == dict and 'status' in result and len(result['status']) < 32: + self.counts[result['status']] += 1 + + if self.sink: + self.sink.push_record(result) + self.counts['pushed'] += 1 + else: + print(json.dumps(result)) + return result + + def timeout_response(self, task): + """ + This should be overridden by workers that want to return something + meaningful when there is a processing timeout. Eg, JSON vs some other + error message. + """ + return None + + def push_record_timeout(self, task, key=None, timeout=300): + """ + A wrapper around self.push_record which sets a timeout. + + Note that this uses signals and *will behave wrong/weirdly* with + multithreading or if signal-based timeouts are used elsewhere in the + same process. + """ + + def timeout_handler(signum, frame): + raise TimeoutError("timeout processing record") + signal.signal(signal.SIGALRM, timeout_handler) + resp = None + signal.alarm(int(timeout)) + try: + resp = self.push_record(task, key=key) + except TimeoutError: + self.counts['timeout'] += 1 + resp = self.timeout_response(task) # pylint: disable=assignment-from-none + # TODO: what if it is this push_record() itself that is timing out? + if resp and self.sink: + self.sink.push_record(resp) + self.counts['pushed'] += 1 + elif resp: + print(json.dumps(resp)) + finally: + signal.alarm(0) + return resp + + def push_batch(self, tasks): + results = [] + for task in tasks: + results.append(self.push_record(task)) + return results + + def finish(self): + if self.sink: + self.sink.finish() + print("Worker: {}".format(self.counts), file=sys.stderr) + return self.counts + + def want(self, task): + """ + Optionally override this as a filter in implementations. + """ + return True + + def process(self, task, key=None): + """ + Derived workers need to implement business logic here. + """ + raise NotImplementedError('implementation required') + + +class SandcrawlerFetchWorker(SandcrawlerWorker): + """ + Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg, + PDFs) from wayback, archive.org, or other sources. + """ + + def __init__(self, wayback_client, **kwargs): + super().__init__(**kwargs) + self.wayback_client = wayback_client + + def fetch_blob(self, record): + start_process = time.time() + default_key = record['sha1hex'] + wayback_sec = None + petabox_sec = None + + if record.get('warc_path') and record.get('warc_offset'): + # it's a full CDX dict. fetch using WaybackClient + if not self.wayback_client: + raise Exception("wayback client not configured for this PdfTrioWorker") + try: + start = time.time() + blob = self.wayback_client.fetch_petabox_body( + csize=record['warc_csize'], + offset=record['warc_offset'], + warc_path=record['warc_path'], + ) + wayback_sec = time.time() - start + except (WaybackError, WaybackContentError, PetaboxError, KeyError) as we: + return dict( + key=default_key, + source=record, + status="error-wayback", + error_msg=str(we), + ) + elif record.get('url') and record.get('datetime'): + # it's a partial CDX dict or something? fetch using WaybackClient + if not self.wayback_client: + raise Exception("wayback client not configured for this PdfTrioWorker") + try: + start = time.time() + blob = self.wayback_client.fetch_replay_body( + url=record['url'], + datetime=record['datetime'], + ) + wayback_sec = time.time() - start + except (WaybackError, WaybackContentError) as we: + return dict( + key=default_key, + source=record, + status="error-wayback", + error_msg=str(we), + ) + elif record.get('item') and record.get('path'): + # it's petabox link; fetch via HTTP + start = time.time() + resp = requests.get("https://archive.org/serve/{}/{}".format( + record['item'], record['path'])) + petabox_sec = time.time() - start + try: + resp.raise_for_status() + except Exception as e: + return dict( + key=default_key, + source=record, + status="error-petabox", + error_msg=str(e), + ) + blob = resp.content + else: + raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed") + if not blob: + return dict( + key=default_key, + source=record, + status="empty-blob", + ) + return dict( + key=default_key, + status="success", + source=record, + blob=blob, + ) + +class MultiprocessWrapper(SandcrawlerWorker): + + def __init__(self, worker, sink, jobs=None): + self.counts = Counter() + self.worker = worker + self.sink = sink + self.pool = multiprocessing.pool.Pool(jobs) + + def push_batch(self, tasks): + self.counts['total'] += len(tasks) + print("... processing batch of: {}".format(len(tasks)), file=sys.stderr) + results = self.pool.map(self.worker.process, tasks) + for result in results: + if not result: + self.counts['failed'] += 1 + return + elif type(result) == dict and 'status' in result and len(result['status']) < 32: + self.counts[result['status']] += 1 + + if self.sink: + self.sink.push_record(result) + self.counts['pushed'] += 1 + else: + print(json.dumps(result)) + return results + + def finish(self): + self.pool.terminate() + if self.sink: + self.sink.finish() + worker_counts = self.worker.finish() + print("Multiprocessing: {}".format(self.counts), file=sys.stderr) + return worker_counts + +class BlackholeSink(SandcrawlerWorker): + """ + Dummy SandcrawlerWorker. That doesn't do or process anything. + + Useful for tests. + """ + + def push_record(self, task, key=None): + return + + def push_batch(self, tasks): + return + +class KafkaSink(SandcrawlerWorker): + + def __init__(self, kafka_hosts, produce_topic, **kwargs): + self.sink = None + self.counts = Counter() + self.produce_topic = produce_topic + self.kafka_hosts = kafka_hosts + + config = self.producer_config({ + 'bootstrap.servers': kafka_hosts, + 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes + 'api.version.request': True, + 'api.version.fallback.ms': 0, + }) + self.producer = Producer(config) + + + @staticmethod + def _fail_fast(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err), file=sys.stderr) + print("Bailing out...", file=sys.stderr) + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + + def producer_config(self, kafka_config): + config = kafka_config.copy() + config.update({ + 'delivery.report.only.error': True, + 'default.topic.config': { + 'message.timeout.ms': 30000, + 'request.required.acks': -1, # all brokers must confirm + } + }) + return config + + def push_record(self, msg, key=None): + self.counts['total'] += 1 + if type(msg) == dict: + if not key and 'key' in msg: + key = msg['key'] + msg = json.dumps(msg) + if type(msg) == str: + msg = msg.encode('utf-8') + assert type(msg) == bytes + + self.producer.produce( + self.produce_topic, + msg, + key=key, + on_delivery=self._fail_fast) + self.counts['produced'] += 1 + + # check for errors etc + self.producer.poll(0) + + def push_batch(self, msgs): + for m in msgs: + self.push_record(m) + + def finish(self): + self.producer.flush() + return self.counts + + +class KafkaCompressSink(KafkaSink): + """ + Variant of KafkaSink for large documents. Used for, eg, GROBID output. + """ + + def producer_config(self, kafka_config): + config = kafka_config.copy() + config.update({ + 'compression.codec': 'gzip', + 'retry.backoff.ms': 250, + 'linger.ms': 1000, + 'batch.num.messages': 50, + 'delivery.report.only.error': True, + 'default.topic.config': { + 'message.timeout.ms': 30000, + 'request.required.acks': -1, # all brokers must confirm + } + }) + return config + + +class RecordPusher: + """ + Base class for different record sources to be pushed into workers. Pretty + trivial interface, just wraps an importer and pushes records in to it. + """ + + def __init__(self, worker, **kwargs): + self.counts = Counter() + self.worker = worker + + def run(self): + """ + This will look something like: + + for line in sys.stdin: + record = json.loads(line) + self.worker.push_record(record) + print(self.worker.finish()) + """ + raise NotImplementedError + + +class JsonLinePusher(RecordPusher): + + def __init__(self, worker, json_file, **kwargs): + self.counts = Counter() + self.worker = worker + self.json_file = json_file + self.batch_size = kwargs.get('batch_size', None) + if self.batch_size in (0, 1): + self.batch_size = None + + def run(self): + batch = [] + for line in self.json_file: + if not line: + continue + self.counts['total'] += 1 + try: + record = json.loads(line) + except json.decoder.JSONDecodeError: + self.counts['error-json-decode'] += 1 + continue + if self.batch_size: + batch.append(record) + if len(batch) >= self.batch_size: + self.worker.push_batch(batch) + self.counts['pushed'] += len(batch) + batch = [] + else: + self.worker.push_record(record) + self.counts['pushed'] += 1 + if self.batch_size and batch: + self.worker.push_batch(batch) + self.counts['pushed'] += len(batch) + batch = [] + worker_counts = self.worker.finish() + print("JSON lines pushed: {}".format(self.counts), file=sys.stderr) + return self.counts + + +class CdxLinePusher(RecordPusher): + + def __init__(self, worker, cdx_file, **kwargs): + self.counts = Counter() + self.worker = worker + self.cdx_file = cdx_file + self.filter_http_statuses = kwargs.get('filter_http_statuses', None) + self.filter_mimetypes = kwargs.get('filter_mimetypes', None) + self.allow_octet_stream = kwargs.get('allow_octet_stream', False) + self.batch_size = kwargs.get('batch_size', None) + if self.batch_size in (0, 1): + self.batch_size = None + + def run(self): + batch = [] + for line in self.cdx_file: + if not line: + continue + self.counts['total'] += 1 + record = parse_cdx_line(line, normalize=True) + if not record: + self.counts['skip-parse'] += 1 + continue + if self.filter_http_statuses and record['http_status'] not in self.filter_http_statuses: + self.counts['skip-http_status'] += 1 + continue + if self.filter_mimetypes and record['mimetype'] not in self.filter_mimetypes: + self.counts['skip-mimetype'] += 1 + continue + if self.batch_size: + batch.append(record) + if len(batch) >= self.batch_size: + self.worker.push_batch(batch) + self.counts['pushed'] += len(batch) + batch = [] + else: + self.worker.push_record(record) + self.counts['pushed'] += 1 + if self.batch_size and batch: + self.worker.push_batch(batch) + self.counts['pushed'] += len(batch) + batch = [] + worker_counts = self.worker.finish() + print("CDX lines pushed: {}".format(self.counts), file=sys.stderr) + return self.counts + + +class ZipfilePusher(RecordPusher): + + def __init__(self, worker, zipfile_path, **kwargs): + self.counts = Counter() + self.worker = worker + self.filter_suffix = ".pdf" + self.zipfile_path = zipfile_path + self.batch_size = kwargs.get('batch_size', None) + if self.batch_size in (0, 1): + self.batch_size = None + + def run(self): + batch = [] + with zipfile.ZipFile(self.zipfile_path, 'r') as archive: + for zipinfo in archive.infolist(): + if not zipinfo.filename.endswith(self.filter_suffix): + continue + self.counts['total'] += 1 + # NB doesn't really extract the file, just gives you a stream (file-like-object) for reading it + flo = archive.open(zipinfo, 'r') + data = flo.read(2**32) + flo.close() + if self.batch_size: + batch.append(data) + if len(batch) >= self.batch_size: + self.worker.push_batch(batch) + self.counts['pushed'] += len(batch) + batch = [] + else: + self.worker.push_record(data) + self.counts['pushed'] += 1 + if self.batch_size and batch: + self.worker.push_batch(batch) + self.counts['pushed'] += len(batch) + batch = [] + worker_counts = self.worker.finish() + print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr) + return self.counts + +class KafkaJsonPusher(RecordPusher): + + def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs): + self.counts = Counter() + self.worker = worker + self.consumer = make_kafka_consumer( + kafka_hosts, + consume_topic, + group, + ) + self.push_batches = kwargs.get('push_batches', False) + self.raw_records = kwargs.get('raw_records', False) + self.poll_interval = kwargs.get('poll_interval', 5.0) + self.batch_size = kwargs.get('batch_size', 100) + if self.batch_size in (0, 1): + self.batch_size = 1 + self.batch_worker = kwargs.get('batch_worker', False) + self.process_timeout_sec = kwargs.get('process_timeout_sec', 300) + + def run(self): + while True: + # TODO: this is batch-oriented, because underlying worker is + # often batch-oriented, but this doesn't confirm that entire batch + # has been pushed to fatcat before commiting offset. Eg, consider + # case where there there is one update and thousands of creates; + # update would be lingering in worker, and if worker crashed + # never created. Not great. + batch = self.consumer.consume( + num_messages=self.batch_size, + timeout=self.poll_interval) + print("... got {} kafka messages ({}sec poll interval)".format( + len(batch), self.poll_interval), + file=sys.stderr) + if not batch: + # TODO: could have some larger timeout here and + # self.worker.finish() if it's been more than, eg, a couple + # minutes + continue + # first check errors on entire batch... + for msg in batch: + if msg.error(): + raise KafkaException(msg.error()) + # ... then process + if self.push_batches: + self.counts['total'] += len(batch) + records = [json.loads(msg.value().decode('utf-8')) for msg in batch] + self.worker.push_batch(records) + self.counts['pushed'] += len(batch) + print("Import counts: {}".format(self.worker.counts), file=sys.stderr) + else: + for msg in batch: + self.counts['total'] += 1 + if self.raw_records: + # In this mode, pass the Kafka message as bytes through + # without decoding as JSON. Eg, for thumbnails (where + # message bytes are JPEG, and we need # the sha1hex key + # from the message) + record = msg.value() + else: + record = json.loads(msg.value().decode('utf-8')) + # This complex bit of code implements backoff/backpressure + # in a way that will not cause this Kafka consumer to lose + # partition assignments (resulting in a rebalance). This + # was needed for the ingest workers. There is probably a + # better way to structure this concurrency. + done = False + while not done: + try: + # use timeouts; don't want kafka itself to timeout + self.worker.push_record_timeout(record, key=msg.key(), timeout=self.process_timeout_sec) + break + except SandcrawlerBackoffError as be: + print("Backing off for 200 seconds: {}".format(be)) + self.consumer.pause(self.consumer.assignment()) + for i in range(40): + # Beware this poll which should not be + # receiving any messages because we are paused! + empty_batch = self.consumer.poll(0) + assert not empty_batch + time.sleep(5) + self.consumer.resume(self.consumer.assignment()) + self.counts['pushed'] += 1 + if self.counts['total'] % 500 == 0: + print("Import counts: {}".format(self.worker.counts), file=sys.stderr) + for msg in batch: + # locally store offsets of processed messages; will be + # auto-commited by librdkafka from this "stored" value + self.consumer.store_offsets(message=msg) + + # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or + # commit the current batch if it has been lingering + worker_counts = self.worker.finish() + print("KafkaJson lines pushed: {}".format(self.counts), file=sys.stderr) + self.consumer.close() + return self.counts + + +def make_kafka_consumer(hosts, consume_topic, group): + topic_name = consume_topic + + def fail_fast(err, partitions): + if err is not None: + print("Kafka consumer commit error: {}".format(err), file=sys.stderr) + print("Bailing out...", file=sys.stderr) + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + for p in partitions: + # check for partition-specific commit errors + if p.error: + print("Kafka consumer commit error: {}".format(p.error), file=sys.stderr) + print("Bailing out...", file=sys.stderr) + # TODO: should it be sys.exit(-1)? + raise KafkaException(p.error) + #print("Kafka consumer commit successful") + pass + + # previously, using pykafka + #auto_commit_enable=True, + #auto_commit_interval_ms=30000, # 30 seconds + conf = { + 'bootstrap.servers': hosts, + 'group.id': group, + 'on_commit': fail_fast, + # messages don't have offset marked as stored until processed, + # but we do auto-commit stored offsets to broker + 'enable.auto.offset.store': False, + 'enable.auto.commit': True, + # user code timeout; if no poll after this long, assume user code + # hung and rebalance (default: 6min) + 'max.poll.interval.ms': 360000, + 'default.topic.config': { + 'auto.offset.reset': 'latest', + }, + } + + def on_rebalance(consumer, partitions): + for p in partitions: + if p.error: + raise KafkaException(p.error) + print("Kafka partitions rebalanced: {} / {}".format( + consumer, partitions), + file=sys.stderr) + + consumer = Consumer(conf) + # NOTE: it's actually important that topic_name *not* be bytes (UTF-8 + # encoded) + consumer.subscribe([topic_name], + on_assign=on_rebalance, + on_revoke=on_rebalance, + ) + print("Consuming from kafka topic {}, group {}".format(topic_name, group), file=sys.stderr) + return consumer diff --git a/python/sandcrawler/xml.py b/python/sandcrawler/xml.py new file mode 100644 index 0000000..7a0086d --- /dev/null +++ b/python/sandcrawler/xml.py @@ -0,0 +1,7 @@ + +import xml.etree.ElementTree as ET + + +def xml_reserialize(raw: bytes) -> str: + root = ET.fromstring(raw) + return '<?xml version="1.0" encoding="UTF-8"?>\n' + ET.tostring(root, encoding="unicode") |