aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler')
-rw-r--r--python/sandcrawler/__init__.py10
-rw-r--r--python/sandcrawler/db.py418
-rw-r--r--python/sandcrawler/grobid.py130
-rw-r--r--python/sandcrawler/html.py348
-rw-r--r--python/sandcrawler/html_ingest.py441
-rw-r--r--python/sandcrawler/html_metadata.py857
-rw-r--r--python/sandcrawler/ia.py1138
-rw-r--r--python/sandcrawler/ingest.py833
-rw-r--r--python/sandcrawler/minio.py99
-rw-r--r--python/sandcrawler/misc.py222
-rw-r--r--python/sandcrawler/pdfextract.py470
-rw-r--r--python/sandcrawler/pdftrio.py130
-rw-r--r--python/sandcrawler/persist.py584
-rw-r--r--python/sandcrawler/workers.py625
-rw-r--r--python/sandcrawler/xml.py7
15 files changed, 6312 insertions, 0 deletions
diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py
new file mode 100644
index 0000000..e461462
--- /dev/null
+++ b/python/sandcrawler/__init__.py
@@ -0,0 +1,10 @@
+
+from .grobid import GrobidClient, GrobidWorker, GrobidBlobWorker
+from .pdftrio import PdfTrioClient, PdfTrioWorker, PdfTrioBlobWorker
+from .misc import gen_file_metadata, b32_hex, parse_cdx_line, parse_cdx_datetime, clean_url
+from .workers import KafkaSink, KafkaCompressSink, JsonLinePusher, CdxLinePusher, CdxLinePusher, KafkaJsonPusher, BlackholeSink, ZipfilePusher, MultiprocessWrapper
+from .ia import WaybackClient, WaybackError, WaybackContentError, CdxApiClient, CdxApiError, SavePageNowClient, SavePageNowError, PetaboxError, ResourceResult, WarcResource, CdxPartial, CdxRow
+from .ingest import IngestFileWorker
+from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker, PersistPdfTrioWorker, PersistIngestRequestWorker, PersistPdfTextWorker, PersistThumbnailWorker
+from .db import SandcrawlerPostgrestClient, SandcrawlerPostgresClient
+from .pdfextract import PdfExtractWorker, PdfExtractBlobWorker
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
new file mode 100644
index 0000000..e60b310
--- /dev/null
+++ b/python/sandcrawler/db.py
@@ -0,0 +1,418 @@
+
+import json
+import datetime
+from typing import Optional
+
+import psycopg2
+import psycopg2.extras
+import requests
+
+class SandcrawlerPostgrestClient:
+
+ def __init__(self, api_url="http://wbgrp-svc506.us.archive.org:3030", **kwargs):
+ self.api_url = api_url
+
+ def get_cdx(self, url):
+ resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.'+url))
+ resp.raise_for_status()
+ return resp.json() or None
+
+ def get_grobid(self, sha1):
+ resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.'+sha1))
+ resp.raise_for_status()
+ resp = resp.json()
+ if resp:
+ return resp[0]
+ else:
+ return None
+
+ def get_pdftrio(self, sha1):
+ resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.'+sha1))
+ resp.raise_for_status()
+ resp = resp.json()
+ if resp:
+ return resp[0]
+ else:
+ return None
+
+ def get_pdf_meta(self, sha1):
+ resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.'+sha1))
+ resp.raise_for_status()
+ resp = resp.json()
+ if resp:
+ return resp[0]
+ else:
+ return None
+
+ def get_html_meta(self, sha1hex: str) -> Optional[dict]:
+ resp = requests.get(
+ self.api_url + "/html_meta",
+ params=dict(sha1hex=f"eq.{sha1hex}"),
+ )
+ resp.raise_for_status()
+ resp_json = resp.json()
+ if resp_json:
+ return resp_json[0]
+ else:
+ return None
+
+ def get_file_meta(self, sha1):
+ resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1))
+ resp.raise_for_status()
+ resp = resp.json()
+ if resp:
+ return resp[0]
+ else:
+ return None
+
+ def get_ingest_file_result(self, ingest_type: str, url: str) -> Optional[dict]:
+ resp = requests.get(
+ self.api_url + "/ingest_file_result",
+ params=dict(ingest_type=f"eq.{ingest_type}", base_url=f"eq.{url}"),
+ )
+ resp.raise_for_status()
+ resp_json = resp.json()
+ if resp_json:
+ return resp_json[0]
+ else:
+ return None
+
+ def get_crossref(self, doi):
+ resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.'+doi))
+ resp.raise_for_status()
+ resp = resp.json()
+ if resp:
+ return resp[0]
+ else:
+ return None
+
+class SandcrawlerPostgresClient:
+
+ def __init__(self, db_url, **kwargs):
+ self.conn = psycopg2.connect(db_url)
+
+ def cursor(self):
+ return self.conn.cursor()
+
+ def commit(self):
+ return self.conn.commit()
+
+ def _inserts_and_updates(self, resp, on_conflict):
+ resp = [int(r[0]) for r in resp]
+ inserts = len([r for r in resp if r == 0])
+ if on_conflict == "update":
+ updates = len([r for r in resp if r != 0])
+ else:
+ updates = 0
+ return (inserts, updates)
+
+ def insert_cdx(self, cur, batch, on_conflict="nothing"):
+ sql = """
+ INSERT INTO
+ cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset)
+ VALUES %s
+ ON CONFLICT ON CONSTRAINT cdx_pkey DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING xmax;"
+
+ batch = [d for d in batch if d.get('warc_path')]
+ if not batch:
+ return (0, 0)
+ batch = [(d['url'],
+ d['datetime'],
+ d['sha1hex'],
+ d['mimetype'],
+ d['warc_path'],
+ int(d['warc_csize']),
+ int(d['warc_offset']))
+ for d in batch]
+ # filter out duplicate rows by key (url, datetime)
+ batch_dict = dict()
+ for b in batch:
+ batch_dict[(b[0], b[1])] = b
+ batch = list(batch_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
+
+ def insert_file_meta(self, cur, batch, on_conflict="nothing"):
+ sql = """
+ INSERT INTO
+ file_meta(sha1hex, sha256hex, md5hex, size_bytes, mimetype)
+ VALUES %s
+ ON CONFLICT (sha1hex) DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ elif on_conflict.lower() == "update":
+ sql += """ UPDATE SET
+ sha256hex=EXCLUDED.sha256hex,
+ md5hex=EXCLUDED.md5hex,
+ size_bytes=EXCLUDED.size_bytes,
+ mimetype=EXCLUDED.mimetype
+ """
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING xmax;"
+ batch = [(d['sha1hex'],
+ d['sha256hex'],
+ d['md5hex'],
+ int(d['size_bytes']),
+ d['mimetype'])
+ for d in batch]
+ # filter out duplicate rows by key (sha1hex)
+ batch_dict = dict()
+ for b in batch:
+ batch_dict[b[0]] = b
+ batch = list(batch_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
+
+ def insert_grobid(self, cur, batch, on_conflict="nothing"):
+ sql = """
+ INSERT INTO
+ grobid (sha1hex, grobid_version, status_code, status, fatcat_release, updated, metadata)
+ VALUES %s
+ ON CONFLICT (sha1hex) DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ elif on_conflict.lower() == "update":
+ sql += """ UPDATE SET
+ grobid_version=EXCLUDED.grobid_version,
+ status_code=EXCLUDED.status_code,
+ status=EXCLUDED.status,
+ fatcat_release=EXCLUDED.fatcat_release,
+ updated=EXCLUDED.updated,
+ metadata=EXCLUDED.metadata
+ """
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING xmax;"
+ for r in batch:
+ if r.get('metadata'):
+ # sometimes these are only in metadata; shouldn't pass through
+ # though (to save database space)
+ dupe_fields = ('fatcat_release', 'grobid_version')
+ for k in dupe_fields:
+ if not k in r:
+ r[k] = r['metadata'].get(k)
+ r['metadata'].pop(k, None)
+ r['metadata'] = json.dumps(r['metadata'], sort_keys=True)
+ batch = [(d['key'],
+ d.get('grobid_version') or None,
+ d['status_code'],
+ d['status'],
+ d.get('fatcat_release') or None,
+ d.get('updated') or datetime.datetime.now(),
+ d.get('metadata') or None ,
+ )
+ for d in batch]
+ # filter out duplicate rows by key (sha1hex)
+ batch_dict = dict()
+ for b in batch:
+ batch_dict[b[0]] = b
+ batch = list(batch_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
+
+ def insert_pdf_meta(self, cur, batch, on_conflict="nothing"):
+ """
+ batch elements are expected to have .to_sql_tuple() method
+ """
+ sql = """
+ INSERT INTO
+ pdf_meta (sha1hex, updated, status, has_page0_thumbnail, page_count, word_count, page0_height, page0_width, permanent_id, pdf_created, pdf_version, metadata)
+ VALUES %s
+ ON CONFLICT (sha1hex) DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ elif on_conflict.lower() == "update":
+ sql += """ UPDATE SET
+ updated=EXCLUDED.updated,
+ status=EXCLUDED.status,
+ has_page0_thumbnail=EXCLUDED.has_page0_thumbnail,
+ page_count=EXCLUDED.page_count,
+ word_count=EXCLUDED.word_count,
+ page0_height=EXCLUDED.page0_height,
+ page0_width=EXCLUDED.page0_width,
+ permanent_id=EXCLUDED.permanent_id,
+ pdf_created=EXCLUDED.pdf_created,
+ pdf_version=EXCLUDED.pdf_version,
+ metadata=EXCLUDED.metadata
+ """
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING xmax;"
+ batch = [d.to_sql_tuple() for d in batch]
+ # filter out duplicate rows by key (sha1hex)
+ batch_dict = dict()
+ for b in batch:
+ batch_dict[b[0]] = b
+ batch = list(batch_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
+
+ def insert_html_meta(self, cur, batch, on_conflict="nothing"):
+ """
+ batch elements are expected to have .to_sql_tuple() method
+ """
+ sql = """
+ INSERT INTO
+ html_meta (sha1hex, updated, status, scope, has_teixml, has_thumbnail, word_count, biblio, resources)
+ VALUES %s
+ ON CONFLICT (sha1hex) DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ elif on_conflict.lower() == "update":
+ sql += """ UPDATE SET
+ updated=EXCLUDED.updated,
+ status=EXCLUDED.status,
+ scope=EXCLUDED.scope,
+ has_teixml=EXCLUDED.has_teixml,
+ has_thumbnail=EXCLUDED.has_thumbnail,
+ word_count=EXCLUDED.word_count,
+ biblio=EXCLUDED.biblio,
+ resources=EXCLUDED.resources
+ """
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING xmax;"
+ batch = [d.to_sql_tuple() for d in batch]
+ # filter out duplicate rows by key (sha1hex)
+ batch_dict = dict()
+ for b in batch:
+ batch_dict[b[0]] = b
+ batch = list(batch_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
+
+ def insert_pdftrio(self, cur, batch, on_conflict="nothing"):
+ sql = """
+ INSERT INTO
+ pdftrio (sha1hex, updated, status_code, status, pdftrio_version,
+ models_date, ensemble_score, bert_score, linear_score,
+ image_score)
+ VALUES %s
+ ON CONFLICT (sha1hex) DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ elif on_conflict.lower() == "update":
+ sql += """ UPDATE SET
+ updated=EXCLUDED.updated,
+ status_code=EXCLUDED.status_code,
+ status=EXCLUDED.status,
+ pdftrio_version=EXCLUDED.pdftrio_version,
+ models_date=EXCLUDED.models_date,
+ ensemble_score=EXCLUDED.ensemble_score,
+ bert_score=EXCLUDED.bert_score,
+ linear_score=EXCLUDED.linear_score,
+ image_score=EXCLUDED.image_score
+ """
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING xmax;"
+ batch = [
+ (
+ d['key'],
+ d.get('updated') or datetime.datetime.now(),
+ d['status_code'],
+ d['status'],
+ d.get('versions', {}).get('pdftrio_version') or None,
+ d.get('versions', {}).get('models_date') or None,
+ d.get('ensemble_score'),
+ d.get('bert_score'),
+ d.get('linear_score'),
+ d.get('image_score'),
+ )
+ for d in batch]
+ # filter out duplicate rows by key (sha1hex)
+ batch_dict = dict()
+ for b in batch:
+ batch_dict[b[0]] = b
+ batch = list(batch_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
+
+ def insert_ingest_request(self, cur, batch, on_conflict="nothing"):
+ sql = """
+ INSERT INTO
+ ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request)
+ VALUES %s
+ ON CONFLICT ON CONSTRAINT ingest_request_pkey DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING xmax;"
+ for r in batch:
+ # in case these fields were already packed into 'request'
+ extra = r.get('request', {})
+ for k in ('ext_ids', 'fatcat_release', 'edit_extra', 'rel'):
+ if r.get(k):
+ extra[k] = r[k]
+ if extra:
+ r['extra'] = json.dumps(extra, sort_keys=True)
+ batch = [(d['link_source'],
+ d['link_source_id'],
+ d['ingest_type'],
+ d['base_url'],
+ d.get('ingest_request_source'),
+ d.get('release_stage') or None,
+ d.get('extra') or None,
+ )
+ for d in batch]
+ # filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url)
+ batch_dict = dict()
+ for b in batch:
+ batch_dict[(b[0], b[1], b[2], b[3])] = b
+ batch = list(batch_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
+
+ def insert_ingest_file_result(self, cur, batch, on_conflict="nothing"):
+ sql = """
+ INSERT INTO
+ ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex)
+ VALUES %s
+ ON CONFLICT ON CONSTRAINT ingest_file_result_pkey DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ elif on_conflict.lower() == "update":
+ sql += """ UPDATE SET
+ updated=now(),
+ hit=EXCLUDED.hit,
+ status=EXCLUDED.status,
+ terminal_url=EXCLUDED.terminal_url,
+ terminal_dt=EXCLUDED.terminal_dt,
+ terminal_status_code=EXCLUDED.terminal_status_code,
+ terminal_sha1hex=EXCLUDED.terminal_sha1hex
+ """
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING xmax;"
+ batch = [(d['ingest_type'],
+ d['base_url'],
+ bool(d['hit']),
+ d['status'],
+ d.get('terminal_url'),
+ d.get('terminal_dt'),
+ d.get('terminal_status_code'),
+ d.get('terminal_sha1hex'),
+ )
+ for d in batch]
+ # filter out duplicate rows by key (ingest_type, base_url)
+ batch_dict = dict()
+ for b in batch:
+ batch_dict[(b[0], b[1])] = b
+ batch = list(batch_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py
new file mode 100644
index 0000000..b4215dc
--- /dev/null
+++ b/python/sandcrawler/grobid.py
@@ -0,0 +1,130 @@
+
+import requests
+
+from grobid2json import teixml2json
+from .workers import SandcrawlerWorker, SandcrawlerFetchWorker
+from .misc import gen_file_metadata
+
+class GrobidClient(object):
+
+ def __init__(self, host_url="http://grobid.qa.fatcat.wiki", **kwargs):
+ self.host_url = host_url
+ self.consolidate_mode = int(kwargs.get('consolidate_mode', 0))
+
+ def process_fulltext(self, blob, consolidate_mode=None):
+ """
+ Returns dict with keys:
+ - status_code
+ - status (slug)
+ - error_msg (if status == 'error')
+ - tei_xml (if status is 200)
+
+ TODO: persist connection for performance?
+ """
+ assert blob
+
+ if consolidate_mode == None:
+ consolidate_mode = self.consolidate_mode
+
+ try:
+ grobid_response = requests.post(
+ self.host_url + "/api/processFulltextDocument",
+ files={
+ 'input': blob,
+ 'consolidateHeader': self.consolidate_mode,
+ 'consolidateCitations': 0, # too expensive for now
+ 'includeRawCitations': 1,
+ },
+ timeout=180.0,
+ )
+ except requests.Timeout:
+ return {
+ 'status': 'error-timeout',
+ 'status_code': -4, # heritrix3 "HTTP timeout" code
+ 'error_msg': 'GROBID request (HTTP POST) timeout',
+ }
+
+ info = dict(
+ status_code=grobid_response.status_code,
+ )
+ if grobid_response.status_code == 200:
+ info['status'] = 'success'
+ info['tei_xml'] = grobid_response.text
+ if len(info['tei_xml']) > 12000000:
+ # XML is larger than Kafka message size, and much larger than
+ # an article in general; bail out
+ info['status'] = 'error'
+ info['error_msg'] = "response XML too large: {} bytes".format(len(info['tei_xml']))
+ info.pop('tei_xml')
+ else:
+ # response.text is .content decoded as utf-8
+ info['status'] = 'error'
+ info['error_msg'] = grobid_response.text[:10000]
+ return info
+
+ def metadata(self, result):
+ if result['status'] != 'success':
+ return None
+ tei_json = teixml2json(result['tei_xml'], encumbered=False)
+ meta = dict()
+ biblio = dict()
+ for k in ('title', 'authors', 'journal', 'date', 'doi', ):
+ if tei_json.get(k):
+ biblio[k] = tei_json[k]
+ meta['biblio'] = biblio
+ for k in ('grobid_version', 'grobid_timestamp', 'fatcat_release', 'language_code'):
+ if tei_json.get(k):
+ meta[k] = tei_json[k]
+ return meta
+
+class GrobidWorker(SandcrawlerFetchWorker):
+
+ def __init__(self, grobid_client, wayback_client=None, sink=None, **kwargs):
+ super().__init__(wayback_client=wayback_client)
+ self.grobid_client = grobid_client
+ self.sink = sink
+ self.consolidate_mode = 0
+
+ def timeout_response(self, task):
+ default_key = task['sha1hex']
+ return dict(
+ status="error-timeout",
+ error_msg="internal GROBID worker timeout",
+ source=task,
+ key=default_key,
+ )
+
+ def process(self, record, key=None):
+ default_key = record['sha1hex']
+
+ fetch_result = self.fetch_blob(record)
+ if fetch_result['status'] != 'success':
+ return fetch_result
+ blob = fetch_result['blob']
+
+ result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode)
+ result['file_meta'] = gen_file_metadata(blob)
+ result['source'] = record
+ result['key'] = result['file_meta']['sha1hex']
+ return result
+
+class GrobidBlobWorker(SandcrawlerWorker):
+ """
+ This is sort of like GrobidWorker, except it receives blobs directly,
+ instead of fetching blobs from some remote store.
+ """
+
+ def __init__(self, grobid_client, sink=None, **kwargs):
+ super().__init__()
+ self.grobid_client = grobid_client
+ self.sink = sink
+ self.consolidate_mode = 0
+
+ def process(self, blob, key=None):
+ if not blob:
+ return None
+ result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode)
+ result['file_meta'] = gen_file_metadata(blob)
+ result['key'] = result['file_meta']['sha1hex']
+ return result
+
diff --git a/python/sandcrawler/html.py b/python/sandcrawler/html.py
new file mode 100644
index 0000000..cd0a8e8
--- /dev/null
+++ b/python/sandcrawler/html.py
@@ -0,0 +1,348 @@
+
+import re
+import sys
+import json
+import urllib.parse
+
+from bs4 import BeautifulSoup
+
+RESEARCHSQUARE_REGEX = re.compile(r'"url":"(https://assets.researchsquare.com/files/.{1,50}/v\d+/Manuscript.pdf)"')
+IEEEXPLORE_REGEX = re.compile(r'"pdfPath":"(/.*?\.pdf)"')
+OVID_JOURNAL_URL_REGEX = re.compile(r'journalURL = "(http.*)";')
+SCIENCEDIRECT_BOUNCE_URL_REGEX = re.compile(r"window.location = '(http.*)';")
+
+
+def extract_fulltext_url(html_url, html_body):
+ """
+ Takes an HTML document (and URL), assumed to be a landing page, and tries
+ to find a fulltext PDF url.
+
+ On error, or if fails to extract a URL, returns an empty dict.
+ """
+
+ host_prefix = '/'.join(html_url.split('/')[:3])
+ try:
+ soup = BeautifulSoup(html_body, 'html.parser')
+ except TypeError as te:
+ print(f"{te} (url={html_url})", file=sys.stderr)
+ return dict()
+ except UnboundLocalError as ule:
+ print(f"{ule} (url={html_url})", file=sys.stderr)
+ return dict()
+
+ ### General Tricks ###
+
+ # highwire-style meta tag
+ meta = soup.find('meta', attrs={"name":"citation_pdf_url"})
+ if not meta:
+ meta = soup.find('meta', attrs={"name":"bepress_citation_pdf_url"})
+ if not meta:
+ meta = soup.find('meta', attrs={"name":"wkhealth_pdf_url"})
+ if not meta:
+ # researchgate does this; maybe others also?
+ meta = soup.find('meta', attrs={"property":"citation_pdf_url"})
+ if not meta:
+ meta = soup.find('meta', attrs={"name":"eprints.document_url"})
+ # if tag is only partially populated
+ if meta and not meta.get('content'):
+ meta = None
+ # wiley has a weird almost-blank page we don't want to loop on
+ if meta and not "://onlinelibrary.wiley.com/doi/pdf/" in html_url:
+ url = meta['content'].strip()
+ if '://doi.org/' in url:
+ print(f"\tdoi.org in citation_pdf_url (loop?): {url}", file=sys.stderr)
+ elif url.startswith('/'):
+ if host_prefix+url == html_url:
+ print(f"\tavoiding citation_pdf_url link-loop", file=sys.stderr)
+ else:
+ return dict(pdf_url=host_prefix+url, technique='citation_pdf_url')
+ elif url.startswith('http'):
+ if url == html_url:
+ print(f"\tavoiding citation_pdf_url link-loop", file=sys.stderr)
+ else:
+ return dict(pdf_url=url, technique='citation_pdf_url')
+ else:
+ print("\tmalformed citation_pdf_url? {}".format(url), file=sys.stderr)
+
+ meta = soup.find('meta', attrs={"name":"generator"})
+ meta_generator = None
+ if meta and meta.get('content'):
+ meta_generator = meta['content'].strip()
+
+ ### Publisher/Platform Specific ###
+
+ # research square (researchsquare.com)
+ if 'researchsquare.com/article/' in html_url:
+ # JSON in body with a field like:
+ # "url":"https://assets.researchsquare.com/files/4a57970e-b002-4608-b507-b95967649483/v2/Manuscript.pdf"
+ m = RESEARCHSQUARE_REGEX.search(html_body.decode('utf-8'))
+ if m:
+ url = m.group(1)
+ assert len(url) < 4096
+ return dict(release_stage="manuscript", pdf_url=url, technique='publisher')
+
+ # elseiver linking hub
+ # https://linkinghub.elsevier.com/retrieve/pii/S1569199319308975
+ if '://linkinghub.elsevier.com/retrieve/pii/' in html_url:
+ # <input type="hidden" name="redirectURL" value="http%3A%2F%2Fcysticfibrosisjournal.com%2Fretrieve%2Fpii%2FS1569199319308975" id="redirectURL"/>
+ redirect = soup.find("input", attrs={"name": "redirectURL"})
+ if redirect:
+ url = redirect['value'].strip()
+ if 'http' in url:
+ url = urllib.parse.unquote(url)
+ # drop any the query parameter
+ url = url.split('?via')[0]
+ return dict(next_url=url, technique="elsevier-linkinghub")
+
+ # sciencedirect PDF URL extract
+ # https://www.sciencedirect.com/science/article/pii/S0169204621000670
+ if 'sciencedirect.com/science/article/pii/' in html_url and not html_url.endswith(".pdf"):
+ json_tag = soup.find("script", attrs={"type": "application/json", "data-iso-key": "_0"})
+ url = None
+ if json_tag:
+ try:
+ json_text = json_tag.string
+ json_meta = json.loads(json_text)
+ pdf_meta = json_meta['article']['pdfDownload']['urlMetadata']
+ # https://www.sciencedirect.com/science/article/pii/S0169204621000670/pdfft?md5=c4a83d06b334b627ded74cf9423bfa56&pid=1-s2.0-S0169204621000670-main.pdf
+ url = html_url + pdf_meta['pdfExtension'] + "?md5=" + pdf_meta['queryParams']['md5'] + "&pid=" + pdf_meta['queryParams']['pid']
+ except (KeyError, TypeError, json.JSONDecodeError):
+ pass
+ if url:
+ return dict(pdf_url=url, technique="sciencedirect-munge-json")
+
+ # sciencedirect PDF bounce page
+ # https://www.sciencedirect.com/science/article/pii/S2590109519300424/pdfft?md5=854f43a44de186eb58674b8e20631691&pid=1-s2.0-S2590109519300424-main.pdf
+ if '://www.sciencedirect.com/' in html_url and html_url.endswith(".pdf"):
+ # window.location = 'https://pdf.sciencedirectassets.com/320270/AIP/1-s2.0-S2590109519300424/main.pdf?X-Amz-Security-Token=[...]&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20200110T210936Z&X-Amz-SignedHeaders=host&X-Amz-Expires=300&X-Amz-Credential=ASIAQ3PHCVTY23CMDBNC%2F20200110%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Signature=[...]&hash=[...]&host=[...]&pii=S2590109519300424&tid=spdf-74468ebd-6be6-43ac-b294-ced86e8eea58&sid=[...]&type=client';
+ m = SCIENCEDIRECT_BOUNCE_URL_REGEX.search(html_body.decode('utf-8'))
+ if m:
+ url = m.group(1)
+ assert len(url) < 4000
+ return dict(pdf_url=url, technique="sciencedirect-bounce")
+
+ # ieeexplore.ieee.org
+ # https://ieeexplore.ieee.org/document/8730316
+ if '://ieeexplore.ieee.org/document/' in html_url:
+ # JSON in body with a field like:
+ # "pdfPath":"/iel7/6287639/8600701/08730316.pdf",
+ m = IEEEXPLORE_REGEX.search(html_body.decode('utf-8'))
+ if m:
+ url = m.group(1)
+ assert len(url) < 4096
+ return dict(release_stage="published", pdf_url=host_prefix+url, technique="ieeexplore")
+ # https://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=8730313
+ if '://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber' in html_url:
+ # HTML iframe like:
+ # <iframe src="http://web.archive.org/web/20191026011528if_/https://ieeexplore.ieee.org/ielx7/6287639/8600701/08730313.pdf?tp=&amp;arnumber=8730313&amp;isnumber=8600701&amp;ref=" frameborder="0"></iframe>
+ iframe = soup.find("iframe")
+ if iframe and '.pdf' in iframe['src']:
+ return dict(pdf_url=iframe['src'], technique="iframe")
+
+ # https://insights.ovid.com/crossref?an=00042307-202001000-00013
+ # Ovid is some kind of landing page bounce portal tracking run-around.
+ # Can extract actual journal URL from javascript blob in the HTML
+ if '://insights.ovid.com/crossref' in html_url:
+ # var journalURL = "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689";
+ m = OVID_JOURNAL_URL_REGEX.search(html_body.decode('utf-8'))
+ if m:
+ url = m.group(1)
+ assert len(url) < 4096
+ return dict(next_url=url, technique='ovid')
+
+ # osf.io
+ # https://osf.io/8phvx/
+ # https://osf.io/preprints/socarxiv/8phvx/
+ # wow, they ship total javascript crud! going to just guess download URL
+ # based on URL for now. Maybe content type header would help?
+ OSF_DOMAINS = [
+ '://osf.io/',
+ '://biohackrxiv.org/',
+ '://psyarxiv.com/',
+ '://arabixiv.org/',
+ '://engrxiv.org/',
+ '://edarxiv.org//',
+ '://ecsarxiv.org/',
+ '://ecoevorxiv.org/',
+ '://frenxiv.org/',
+ '://indiarxiv.org/',
+ '://mindrxiv.org/',
+ '://mediarxiv.org/',
+ '://paleorxiv.org/',
+ '://thesiscommons.org/',
+ ]
+ for domain in OSF_DOMAINS:
+ if domain in html_url and (len(html_url.split('/')) in [4,5] or '/preprints/' in html_url) and '/download' not in html_url:
+ if not html_url.endswith("/"):
+ next_url = html_url+"/download"
+ else:
+ next_url = html_url+"download"
+ return dict(next_url=next_url, technique='osf-by-url')
+
+ # wiley
+ # https://onlinelibrary.wiley.com/doi/pdf/10.1111/1467-923X.12787
+ if "://onlinelibrary.wiley.com/doi/pdf/" in html_url:
+ if b"/doi/pdfdirect/" in html_body:
+ next_url = html_url.replace('/doi/pdf/', '/doi/pdfdirect/')
+ return dict(next_url=next_url, technique='wiley-pdfdirect')
+
+ # arxiv abstract pages
+ if "://arxiv.org/abs/" in html_url:
+ url = html_url.replace("/abs/", "/pdf/")
+ return dict(pdf_url=url, technique='arxiv-url')
+
+ # american archivist (OA)
+ # https://americanarchivist.org/doi/abs/10.17723/aarc.62.2.j475270470145630
+ if "://americanarchivist.org/doi/" in html_url and not "/doi/pdf" in html_url:
+ # use a more aggressive direct guess to avoid rate-limiting...
+ if "/doi/10." in html_url:
+ url = html_url.replace("/doi/10.", "/doi/pdf/10.")
+ return dict(pdf_url=url, technique='archivist-url')
+ # <a href="/doi/pdf/10.17723/aarc.62.2.j475270470145630" target="_blank">
+ hrefs = soup.find_all('a', attrs={"target":"_blank"})
+ for href in hrefs:
+ url = href['href'].strip()
+ if "/doi/pdf/" in url:
+ if url.startswith('http'):
+ return dict(pdf_url=url, technique='publisher-href')
+ elif url.startswith('/'):
+ return dict(pdf_url=host_prefix+url, technique='publisher-href')
+
+ # protocols.io
+ # https://www.protocols.io/view/flow-cytometry-protocol-mgdc3s6
+ if "://www.protocols.io/view/" in html_url and not html_url.endswith(".pdf"):
+ url = html_url + ".pdf"
+ return dict(pdf_url=url, technique='protocolsio-url')
+
+ # degruyter.com
+ # https://www.degruyter.com/view/books/9783486594621/9783486594621-009/9783486594621-009.xml
+ if "://www.degruyter.com/view/" in html_url and html_url.endswith(".xml"):
+ url = html_url.replace('/view/', '/downloadpdf/').replace('.xml', '.pdf')
+ return dict(pdf_url=url, technique='degruyter-url')
+
+ # journals.lww.com (Wolters Kluwer)
+ # https://journals.lww.com/spinejournal/Abstract/publishahead/Making_the_Most_of_Systematic_Reviews_and.94318.aspx
+ # DISABLED: they seem to redirect our crawler back to a "Fulltext" page and
+ # we never get the content.
+ if "://journals.lww.com/" in html_url and False:
+ # data-pdf-url="https://pdfs.journals.lww.com/spinejournal/9000/00000/Making_the_Most_of_Systematic_Reviews_and.94318.pdf?token=method|ExpireAbsolute;source|Journals;ttl|1582413672903;payload|mY8D3u1TCCsNvP5E421JYK6N6XICDamxByyYpaNzk7FKjTaa1Yz22MivkHZqjGP4kdS2v0J76WGAnHACH69s21Csk0OpQi3YbjEMdSoz2UhVybFqQxA7lKwSUlA502zQZr96TQRwhVlocEp/sJ586aVbcBFlltKNKo+tbuMfL73hiPqJliudqs17cHeLcLbV/CqjlP3IO0jGHlHQtJWcICDdAyGJMnpi6RlbEJaRheGeh5z5uvqz3FLHgPKVXJzdiVgCTnUeUQFYzcJRFhNtc2gv+ECZGji7HUicj1/6h85Y07DBRl1x2MGqlHWXUawD;hash|6cqYBa15ZK407m4VhFfJLw=="
+ for line in html_body.split(b'\n'):
+ if b"data-pdf-url=" in line:
+ line = line.decode('utf-8')
+ url = line.strip().replace('data-pdf-url=', '').replace('"', '')
+ if url.startswith('http') and 'pdfs.journals.lww.com' in url:
+ return dict(pdf_url=url, technique='journals.lww.com-jsvar')
+
+ # www.ahajournals.org
+ # https://www.ahajournals.org/doi/10.1161/circ.110.19.2977
+ if "://www.ahajournals.org/doi/" in html_url and not '/doi/pdf/' in html_url:
+ # <a href="/doi/pdf/10.1161/circ.110.19.2977?download=true">PDF download</a>
+ if b'/doi/pdf/10.' in html_body:
+ url = html_url.replace('/doi/10.', '/doi/pdf/10.')
+ url = url + "?download=true"
+ return dict(pdf_url=url, technique='ahajournals-url')
+
+ # ehp.niehs.nih.gov
+ # https://ehp.niehs.nih.gov/doi/full/10.1289/EHP4709
+ # https://ehp.niehs.nih.gov/doi/10.1289/ehp.113-a51
+ if "://ehp.niehs.nih.gov/doi/" in html_url:
+ # <a href="/doi/pdf/10.1289/EHP4709" target="_blank">
+ if b'/doi/pdf/10.' in html_body:
+ url = html_url.replace('/doi/full/10.', '/doi/pdf/10.').replace('/doi/10.', '/doi/pdf/10.')
+ return dict(pdf_url=url, technique='ehp.niehs.nigh.gov-url')
+
+ # cogentoa.com
+ # https://www.cogentoa.com/article/10.1080/23311975.2017.1412873
+ if "://www.cogentoa.com/article/" in html_url and not ".pdf" in html_url:
+ # blech, it's a SPA! All JS
+ # https://www.cogentoa.com/article/10.1080/23311975.2017.1412873.pdf
+ url = html_url + ".pdf"
+ return dict(pdf_url=url, technique='cogentoa-url')
+
+ # chemrxiv.org (likely to be other figshare domains also)
+ # https://chemrxiv.org/articles/Biradical_Formation_by_Deprotonation_in_Thiazole-Derivatives_The_Hidden_Nature_of_Dasatinib/10101419
+ if "://chemrxiv.org/articles/" in html_url or '.figshare.org/articles/' in html_url:
+ # <script id="app-data" type="text/json"> [...] </script>
+ json_tag = soup.find('script', id="app-data", attrs={"type": "text/json"})
+ if json_tag and json_tag.string:
+ app_data = json.loads(json_tag.string)
+ # "exportPdfDownloadUrl": "https://s3-eu-west-1.amazonaws.com/itempdf74155353254prod/10101419/Biradical_Formation_by_Deprotonation_in_Thiazole-Derivatives__The_Hidden_Nature_of_Dasatinib_v1.pdf"
+ url = app_data.get('article', {}).get('exportPdfDownloadUrl')
+ if url and url.startswith('http'):
+ return dict(pdf_url=url, technique='figshare-json')
+
+ # CNKI COVID-19 landing pages
+ # http://en.gzbd.cnki.net/gzbt/detail/detail.aspx?FileName=HBGF202002003&DbName=GZBJ7920&DbCode=GZBJ
+ if '://en.gzbd.cnki.net/KCMS/detail/detail.aspx' in html_url:
+ # <a onclick="WriteKrsDownLog()" target="_blank" id="pdfDown" name="pdfDown" href="/gzbt/download.aspx?filename=4Q1ZYpFdKFUZ6FDR1QkRrolayRXV2ZzattyQ3QFa2JXTyZXUSV3QRFkbndzaGV2KyJXWZVEbFdVYnZndD9EOxg1Tj5Eeys2SMFzLZ5kcuFkM3dEbsR2ZjxEaShVdJhFdp90KhlVVzcjVVlXUVNHWBtWS5Rlb5cnc&amp;tablename=GZBJLAST2020&amp;dflag=pdfdown&#xA; "><i></i>PDF Download</a>
+ href = soup.find('a', attrs={"id":"pdfDown"})
+ if href:
+ url = href['href'].strip().replace('&#xA;', '')
+ if not url.startswith('http'):
+ url = host_prefix + url
+ return dict(pdf_url=url, technique='cnki-href')
+
+ # RWTH AACHEN repository
+ if '://publications.rwth-aachen.de/record/' in html_url:
+ record_id = html_url.split('/')[-1]
+ url = f"{html_url}/files/{record_id}.pdf"
+ if record_id.isdigit() and url.encode('utf-8') in html_body:
+ return dict(pdf_url=url, technique='rwth-aachen-url')
+
+ # physchemaspects.ru
+ if '://physchemaspects.ru/' in html_url and soup:
+ for href in soup.find_all('a'):
+ if href.text == "download PDF file":
+ url = href['href']
+ if url.startswith('/'):
+ url = host_prefix + url
+ return dict(pdf_url=url, technique='physchemaspects-href')
+
+ # OJS 3 (some)
+ if meta_generator and meta_generator.startswith("Open Journal Systems"):
+ href = soup.find('a', attrs={"class":"obj_galley_link file"})
+ if href and href.text and "pdf" in href.text.lower():
+ url = href['href'].strip()
+ if url.startswith('/'):
+ url = host_prefix + url
+ return dict(pdf_url=url, technique='ojs-galley-href')
+
+ # ETH zurich e-periodica
+ if '://www.e-periodica.ch/digbib/view' in html_url:
+ url = html_url.replace('digbib/view', 'cntmng').split('#')[0]
+ if url.encode('utf-8') in html_body:
+ return dict(pdf_url=url, technique='href-eperiodica')
+
+ # JMIR
+ # https://mhealth.jmir.org/2020/7/e17891/
+ if '.jmir.org/' in html_url and not "/pdf" in html_url and html_url.endswith("/"):
+ url = html_url + "pdf"
+ return dict(pdf_url=url, technique='jmir-url')
+
+ ### below here we are doing guesses
+
+ # generic guess: try current URL plus .pdf, if it exists in the HTML body
+ if not '.pdf' in html_url:
+ url = html_url + ".pdf"
+ if url.encode('utf-8') in html_body:
+ return dict(pdf_url=url, technique='guess-url-plus-pdf')
+
+ return dict()
+
+def test_regex():
+ lines = """
+ blah
+ var journalURL = "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689";
+ asdf"""
+ m = OVID_JOURNAL_URL_REGEX.search(lines)
+ assert m.group(1) == "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689"
+
+ lines = """
+ window.onload = function () {
+ window.location = 'https://pdf.sciencedirectassets.com/320270/AIP/1-s2.0-S2590109519300424/main.pdf?X-Amz-Security-Token=IQoJb3JpZ2luX2VjEH0aCXVzLWVhc3QtMSJGMEQCICBF0dnrtKfpcs3T1kOjMS9w9gedqiLBrcbp4aKQSP8fAiAT9G426t6FWXHO2zPSXRFLq2eiqgbew2vkNKbcn87teyq9Awj1%2F%2F%2F%2F%2F%2F%2F%2F%2F%2F8BEAIaDDA1OTAwMzU0Njg2NSIMnZcTRhbvMwF%2F5PA5KpEDdN%2FDI4V%2BNMDWQDFeAdUc99Lyxak%2B6vhAsfCBCf8hhvrRpalz75e74%2FXMAQwMN9m6i98o0Ljv9od7cuQEy8t%2B0DLzjzX5n3%2FxmpttowhMUm1jc8tBniLKBjwhTyiSHwhdeaVZf6x2zCJ0EIOWMNJHp3iFEqpaFvkRZbC1KWK4XPNNKo72HCvXuG7xmGrdHByz91AP7UgIYCy4hT10fnM43gbOE4wW8fqpgnvwCId%2F2u8k4rQoCLBqLYZzqshCRm1DBbsXCQhTwDXiMC2Ek3f63yKgw7rRCAxvs0vqirG%2B4mJ6LADaztAFMtKDPfnd4e%2B7%2FvnKU2NeotrqrkRgOkIAoFumbQXf20ky6mKWyHBk%2FxirVp60vUcLQpUm2Pcp6ythYxUi9IJxRGX8EF6aV4UHuCpUDUE7o8N84KUXIedUpytUZx7Xoxfk9w%2BR3%2FgX4LEHfkrWgiFAS3bVxNGOeV7GTwcXdcAggbdCaiAe46dfv7DDedx0KhVKOPH7obfvShqd6TYc0BjrV4sx61594ZJ3%2FO0ws7Lj8AU67AF17%2B1NZ3Ugu%2BwG9Ys9s7OxG8E4kBJ58vEY1yuBOQK9y2we4%2FTGPuqSxCuezqA%2BseslXYP%2FRc%2FZL9xx%2FUYaSjZhk1p1mhojxgBrckJYU7d8c4ELMPmtVy6R1yd2VDUoawEU8SB7nbNnMKzqQ3RgGgqGJiELys6dt%2FIr%2BVhpqM%2FZT4zadvzs8P%2FLoGzUHJKNZt0f99wLvZilphV92E%2BOUnwC4wbg3i3af3zozULwgEr7T%2FX2VsyREgexlzk76qMALPn0lgnciUyyQXxyUWAilXYQ0mQdXefh9lFfycczvt0UEuarX9p1sMwl8Ve5aw%3D%3D&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20200110T210936Z&X-Amz-SignedHeaders=host&X-Amz-Expires=300&X-Amz-Credential=ASIAQ3PHCVTY23CMDBNC%2F20200110%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Signature=b43525576e1a0fdbab581481a3fe6db2862cbb2c69f2860b70cc8d444ccd73d5&hash=ccd128dfe597e704224bdfb4b3358de29b2be5d95887c71076bdab1236ba9e42&host=68042c943591013ac2b2430a89b270f6af2c76d8dfd086a07176afe7c76c2c61&pii=S2590109519300424&tid=spdf-74468ebd-6be6-43ac-b294-ced86e8eea58&sid=f9676d658285a749c46b6d081d965bb12aa8gxrqa&type=client';
+ refreshOriginalWindow();
+ }
+ """
+ url = "https://pdf.sciencedirectassets.com/320270/AIP/1-s2.0-S2590109519300424/main.pdf?X-Amz-Security-Token=IQoJb3JpZ2luX2VjEH0aCXVzLWVhc3QtMSJGMEQCICBF0dnrtKfpcs3T1kOjMS9w9gedqiLBrcbp4aKQSP8fAiAT9G426t6FWXHO2zPSXRFLq2eiqgbew2vkNKbcn87teyq9Awj1%2F%2F%2F%2F%2F%2F%2F%2F%2F%2F8BEAIaDDA1OTAwMzU0Njg2NSIMnZcTRhbvMwF%2F5PA5KpEDdN%2FDI4V%2BNMDWQDFeAdUc99Lyxak%2B6vhAsfCBCf8hhvrRpalz75e74%2FXMAQwMN9m6i98o0Ljv9od7cuQEy8t%2B0DLzjzX5n3%2FxmpttowhMUm1jc8tBniLKBjwhTyiSHwhdeaVZf6x2zCJ0EIOWMNJHp3iFEqpaFvkRZbC1KWK4XPNNKo72HCvXuG7xmGrdHByz91AP7UgIYCy4hT10fnM43gbOE4wW8fqpgnvwCId%2F2u8k4rQoCLBqLYZzqshCRm1DBbsXCQhTwDXiMC2Ek3f63yKgw7rRCAxvs0vqirG%2B4mJ6LADaztAFMtKDPfnd4e%2B7%2FvnKU2NeotrqrkRgOkIAoFumbQXf20ky6mKWyHBk%2FxirVp60vUcLQpUm2Pcp6ythYxUi9IJxRGX8EF6aV4UHuCpUDUE7o8N84KUXIedUpytUZx7Xoxfk9w%2BR3%2FgX4LEHfkrWgiFAS3bVxNGOeV7GTwcXdcAggbdCaiAe46dfv7DDedx0KhVKOPH7obfvShqd6TYc0BjrV4sx61594ZJ3%2FO0ws7Lj8AU67AF17%2B1NZ3Ugu%2BwG9Ys9s7OxG8E4kBJ58vEY1yuBOQK9y2we4%2FTGPuqSxCuezqA%2BseslXYP%2FRc%2FZL9xx%2FUYaSjZhk1p1mhojxgBrckJYU7d8c4ELMPmtVy6R1yd2VDUoawEU8SB7nbNnMKzqQ3RgGgqGJiELys6dt%2FIr%2BVhpqM%2FZT4zadvzs8P%2FLoGzUHJKNZt0f99wLvZilphV92E%2BOUnwC4wbg3i3af3zozULwgEr7T%2FX2VsyREgexlzk76qMALPn0lgnciUyyQXxyUWAilXYQ0mQdXefh9lFfycczvt0UEuarX9p1sMwl8Ve5aw%3D%3D&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20200110T210936Z&X-Amz-SignedHeaders=host&X-Amz-Expires=300&X-Amz-Credential=ASIAQ3PHCVTY23CMDBNC%2F20200110%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Signature=b43525576e1a0fdbab581481a3fe6db2862cbb2c69f2860b70cc8d444ccd73d5&hash=ccd128dfe597e704224bdfb4b3358de29b2be5d95887c71076bdab1236ba9e42&host=68042c943591013ac2b2430a89b270f6af2c76d8dfd086a07176afe7c76c2c61&pii=S2590109519300424&tid=spdf-74468ebd-6be6-43ac-b294-ced86e8eea58&sid=f9676d658285a749c46b6d081d965bb12aa8gxrqa&type=client"
+ m = SCIENCEDIRECT_BOUNCE_URL_REGEX.search(lines)
+ assert m.group(1) == url
diff --git a/python/sandcrawler/html_ingest.py b/python/sandcrawler/html_ingest.py
new file mode 100644
index 0000000..f11cac4
--- /dev/null
+++ b/python/sandcrawler/html_ingest.py
@@ -0,0 +1,441 @@
+
+import io
+import sys
+import json
+import datetime
+import argparse
+import xml.etree.ElementTree as ET
+from typing import List, Optional, Any, Tuple
+
+import trafilatura
+import pydantic
+from selectolax.parser import HTMLParser
+
+from sandcrawler.ia import WaybackClient, CdxApiClient, ResourceResult, cdx_to_dict, fix_transfer_encoding, NoCaptureError, WaybackContentError
+from sandcrawler.misc import gen_file_metadata, parse_cdx_datetime, datetime_to_cdx, clean_url, url_fuzzy_equal
+from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, html_extract_biblio, load_adblock_rules
+
+
+TRAFILATURA_AGENT = f"trafilatura/{trafilatura.__version__}"
+
+def html_extract_body_teixml(doc: bytes) -> dict:
+ try:
+ tei_xml = trafilatura.extract(doc,
+ tei_output=True,
+ include_comments=False,
+ include_formatting=True,
+ )
+ except (ValueError, TypeError, Exception) as e:
+ return dict(
+ status="trafilatura-parse-error",
+ error_msg=str(e)[:1000],
+ )
+ if tei_xml:
+ body_txt = teixml_body_text(tei_xml)
+ word_count = len(body_txt.split())
+ return dict(status="success", agent=TRAFILATURA_AGENT, tei_xml=tei_xml, word_count=word_count)
+ elif doc.startswith(b'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 2012"http://www.w3.org/TR/html4/loose.dtd">'):
+ # hack for firstmonday.org
+ return html_extract_body_teixml(doc[106:])
+ else:
+ return dict(status="empty-xml", agent=TRAFILATURA_AGENT)
+
+def teixml_body_text(doc_xml: str) -> str:
+ ns = {"tei": "http://www.tei-c.org/ns/1.0"}
+ tree = ET.fromstring(doc_xml)
+ body = tree.find('.//tei:body', ns)
+ if body:
+ return " ".join(body.itertext())
+ else:
+ return ""
+
+class WebResource(pydantic.BaseModel):
+ surt: str
+ timestamp: datetime.datetime
+ url: str
+ sha1hex: str
+ mimetype: str
+ status_code: int
+ size: Optional[int]
+ sha256hex: Optional[str]
+ resource_type: Optional[str]
+
+ class Config:
+ json_encoders = {
+ datetime.datetime: lambda dt: dt.isoformat()
+ }
+
+class IngestWebResult(pydantic.BaseModel):
+ status: str
+ hit: bool
+ error_message: Optional[str]
+ cdx: Optional[dict]
+ terminal: Optional[Any] # TODO
+ request: Optional[Any] # TODO
+ file_meta: Optional[dict]
+ html_biblio: Optional[BiblioMetadata]
+ scope: Optional[str]
+ html_body: Optional[dict]
+ html_resources: Optional[List[WebResource]]
+
+ class Config:
+ arbitrary_types_allowed = True
+ json_encoders = {
+ datetime.datetime: lambda dt: dt.isoformat(),
+ }
+
+class HtmlMetaRow(pydantic.BaseModel):
+ sha1hex: str
+ status: str
+ scope: Optional[str]
+ has_teixml: bool
+ has_thumbnail: bool
+ word_count: Optional[int]
+ biblio: Optional[dict]
+ resources: Optional[List[dict]]
+
+ class Config:
+ arbitrary_types_allowed = True
+ json_encoders = {
+ datetime.datetime: lambda dt: dt.isoformat(),
+ }
+
+ def to_sql_tuple(self) -> Tuple:
+ """
+ This is for the html_meta SQL table.
+ """
+ return (
+ self.sha1hex,
+ datetime.datetime.now(), # updated
+ self.status,
+ self.scope,
+ self.has_teixml,
+ self.has_thumbnail,
+ self.word_count,
+ (self.biblio or None) and json.dumps(self.biblio, sort_keys=True),
+ (self.resources or None) and json.dumps(self.resources, sort_keys=True),
+ )
+
+
+def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient, when: Optional[datetime.datetime]) -> List[WebResource]:
+ """
+ This is the lazy version that just does a CDX lookup for each resource.
+
+ Takes a list instead of single record because we may want to circuit break
+ on failure, and may introduce concurrency internal to this function.
+ """
+
+ full = []
+ closest = when and datetime_to_cdx(when)
+ for resource in resources:
+ cdx_row = cdx_client.lookup_best(resource['url'], closest=closest)
+ if not cdx_row:
+ raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}")
+ if cdx_row.url != resource['url'] and not url_fuzzy_equal(cdx_row.url, resource['url']):
+ print(f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}", file=sys.stderr)
+ if not cdx_row.status_code:
+ # TODO: fall back to a full fetch?
+ print(f" WARN: skipping revisit record", file=sys.stderr)
+ continue
+ full.append(WebResource(
+ surt=cdx_row.surt,
+ timestamp=cdx_row.datetime,
+ url=cdx_row.url,
+ sha1hex=cdx_row.sha1hex,
+ mimetype=cdx_row.mimetype,
+ status_code=cdx_row.status_code,
+ size=None,
+ sha256hex=None,
+ resource_type=resource['type'],
+ ))
+
+ return full
+
+
+def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient, when: Optional[datetime.datetime]) -> List[WebResource]:
+ """
+ This is the full version which fetches each resource from wayback/petabox
+ and calculates additional hashes.
+
+ Could make this concurrent in the future, eg: https://realpython.com/python-concurrency/#threading-version
+ """
+
+ full = []
+ closest = when and datetime_to_cdx(when)
+ for resource in resources:
+ wayback_resp = wayback_client.lookup_resource(resource['url'], closest=closest)
+ if not wayback_resp or wayback_resp.status != 'success':
+ raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}")
+ file_meta = gen_file_metadata(wayback_resp.body, allow_empty=True)
+ if file_meta['sha1hex'] != wayback_resp.cdx.sha1hex:
+ raise WaybackContentError(f"wayback payload sha1hex mismatch: {wayback_resp.cdx.datetime} {wayback_resp.cdx.url}")
+ full.append(WebResource(
+ surt=wayback_resp.cdx.surt,
+ timestamp=parse_cdx_datetime(wayback_resp.cdx.datetime),
+ url=wayback_resp.cdx.url,
+ sha1hex=file_meta['sha1hex'],
+ mimetype=file_meta['mimetype'],
+ status_code=wayback_resp.cdx.status_code or wayback_resp.revisit_cdx.status_code,
+ size=file_meta['size_bytes'],
+ sha256hex=file_meta['sha256hex'],
+ resource_type=resource['type'],
+ ))
+
+ return full
+
+
+def html_guess_platform(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata]) -> Optional[str]:
+
+ generator: Optional[str] = None
+ generator_elem = doc.css_first("meta[name='generator']")
+ if generator_elem:
+ generator = generator_elem.attrs['content']
+ else:
+ generator_elem = doc.css_first("a[id='developedBy']")
+ if generator_elem:
+ generator = generator_elem.text()
+ if generator and "open journal systems 3" in generator.lower():
+ return "ojs3"
+ elif generator and "open journal systems" in generator.lower():
+ return "ojs"
+ elif generator and "plone" in generator.lower():
+ return "plone"
+ elif generator and "wordpress" in generator.lower():
+ return "wordpress"
+ elif generator and "blogger" in generator.lower():
+ return "blogger"
+ elif doc.css_first("body[id='pkp-common-openJournalSystems']"):
+ return "ojs"
+ else:
+ try:
+ if 'powered by <a target="blank" href="http://pkp.sfu.ca/ojs/">PKP OJS</a>' in doc.html:
+ return "ojs"
+ if 'Powered by <a target="_blank" href="http://arphahub.com">' in doc.html:
+ return "arpha"
+ if "<meta property='og:image' content='http://cms.galenos.com.tr' />" in doc.html:
+ return "galenos"
+ except UnicodeDecodeError:
+ pass
+
+ icon_elem = doc.css_first("link[type='image/x-icon']")
+ if icon_elem and 'href' in icon_elem.attrs:
+ if 'journalssystem.com' in icon_elem.attrs['href']:
+ return "journalssystem.com"
+ elif 'indexcopernicus.com' in icon_elem.attrs['href']:
+ return "indexcopernicus"
+
+ if 'scielo' in url:
+ return "scielo"
+
+ return None
+
+def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata], word_count: Optional[int]) -> str:
+ """
+ This function tries to guess if an HTML document represents one of:
+
+ - article-fulltext
+ - article-abstract
+ - article-sample
+ - supplement
+ - component
+ - issue-fulltext
+ - landingpage
+ - homepage-domain
+ - blocked-paywall
+ - blocked-login
+ - blocked-captcha
+ - blocked-cookie
+ - errorpage
+ - stub
+ - other
+ - unknown
+
+ Unknown implies the page could be anything. "other" implies it is not
+ fulltext or a landing page, but could be one of the other categories.
+ """
+
+ # assert that this is a real URL
+ assert url.count('/') >= 2
+
+ # basic paywall and loginwall detection based on URL
+ if url.endswith("/cookieAbsent"):
+ return "blocked-cookie"
+ if "://page-one.live.cf.public.springer.com" in url:
+ return "article-sample"
+
+ if "scielo" in url:
+ if "sci_abstract" in url:
+ return "landingpage"
+ if "sci_arttext" in url:
+ return "article-fulltext"
+
+ if "showcaptcha.asp" in url:
+ return "blocked-captcha"
+
+ # is this the top-level URL of the domain? aka, no path?
+ if url.count('/') <= 2 or (url.count('/') == 3) and url.endswith('/'):
+ return "homepage-domain"
+
+ platform = html_guess_platform(url, doc, biblio)
+
+ if biblio:
+ if biblio.html_fulltext_url:
+ if url_fuzzy_equal(biblio.html_fulltext_url, url):
+ return "article-fulltext"
+ else:
+ return "landingpage"
+
+ # platform-specific detection
+ if platform in ("ojs", "ojs3"):
+
+ if biblio and biblio.title:
+ if word_count and word_count > 1200:
+ return "fulltext"
+ else:
+ return "landingpage"
+ else:
+ if "/article/view/" in url and word_count and word_count > 600:
+ return "fulltext"
+ return "other"
+ elif platform == "journalssystem.com":
+ if biblio and biblio.pdf_fulltext_url and word_count and word_count < 1000:
+ return "landingpage"
+
+ # more platform/publisher specific checks
+ if "karger.com/Article/Abstract" in url:
+ return "landingpage"
+ if "dergipark.gov.tr" in url and not ("download/article-file" in url):
+ return "other"
+
+ try:
+ if isinstance(doc.html, str) and "<center><h1>403 Forbidden</h1></center>" in doc.html:
+ # cloudflare block pattern
+ return "blocked-forbidden"
+ except UnicodeDecodeError:
+ pass
+
+ print(f" scope guessing: platform {platform} word count: {word_count}", file=sys.stderr)
+
+ # fallback: guess based on word count (arbitrary guesses here)
+ if word_count is not None:
+ if word_count < 20:
+ return "stub"
+ elif word_count > 500 and platform in ['wordpress', 'blogger']:
+ return "article-fulltext"
+ elif word_count > 1200:
+ return "article-fulltext"
+
+ return "unknown"
+
+
+def run_single(url: str, timestamp: Optional[str] = None, quick_mode: bool = False) -> IngestWebResult:
+
+ adblock = load_adblock_rules()
+ wayback_client = WaybackClient()
+
+ html_resource = wayback_client.lookup_resource(url, "text/html", closest=timestamp)
+ if html_resource.status != "success":
+ return IngestWebResult(
+ status=html_resource.status,
+ hit=False,
+ cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx),
+ )
+
+ assert html_resource.terminal_status_code == 200
+
+ file_meta = gen_file_metadata(html_resource.body)
+ file_meta, html_resource = fix_transfer_encoding(file_meta, html_resource)
+
+ if file_meta['mimetype'] not in ("text/html", "text/xml"):
+ return IngestWebResult(
+ status="wrong-mimetype",
+ hit=False,
+ cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx),
+ file_meta=file_meta,
+ )
+
+ html_doc = HTMLParser(html_resource.body)
+ html_biblio = html_extract_biblio(url, html_doc)
+ html_body = html_extract_body_teixml(html_resource.body)
+ html_scope = html_guess_scope(url, html_doc, html_biblio, html_body.get('word_count'))
+ if html_scope not in ('article-fulltext', 'unknown'):
+ return IngestWebResult(
+ status="wrong-scope",
+ hit=False,
+ cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx),
+ file_meta=file_meta,
+ html_biblio=html_biblio,
+ scope=html_scope,
+ )
+
+ raw_resources = html_extract_resources(html_resource.terminal_url, html_doc, adblock)
+ assert len(raw_resources) <= 200
+
+ when = parse_cdx_datetime(html_resource.cdx.datetime)
+
+ full_resources: List[WebResource] = []
+ if quick_mode:
+ full_resources = quick_fetch_html_resources(raw_resources, wayback_client.cdx_client, when)
+ else:
+ full_resources = fetch_html_resources(raw_resources, wayback_client, when)
+
+ output = IngestWebResult(
+ status="success",
+ hit=True,
+ cdx=html_resource.cdx and cdx_to_dict(html_resource.cdx),
+ file_meta=file_meta,
+ html_body=html_body,
+ html_biblio=html_biblio,
+ scope=html_scope,
+ html_resources=full_resources,
+ )
+ return output
+
+
+def main() -> None:
+ """
+ Run this command like:
+
+ python -m sandcrawler.html_ingest
+ """
+
+ parser = argparse.ArgumentParser(
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter
+ )
+ subparsers = parser.add_subparsers()
+
+ sub = subparsers.add_parser(
+ "single", help="tries to ingest a single URL, dumps result to stdout"
+ )
+ sub.set_defaults(func="run_single")
+ sub.add_argument(
+ "url",
+ help="URL to fetch",
+ type=str,
+ )
+ sub.add_argument(
+ "--timestamp",
+ help="timestamp for which to fetch document from wayback",
+ type=str,
+ )
+ sub.add_argument(
+ "--quick-mode",
+ help="don't fetch resources, only do CDX lookup",
+ action="store_true",
+ )
+
+ args = parser.parse_args()
+ if not args.__dict__.get("func"):
+ parser.print_help(file=sys.stderr)
+ sys.exit(-1)
+
+ if args.func == "run_single":
+ result = run_single(args.url, args.timestamp, args.quick_mode)
+ print(result.json(indent=2, exclude_none=True))
+ else:
+ #func = getattr(wp, args.func)
+ #func()
+ raise NotImplementedError()
+
+if __name__ == "__main__":
+ main()
diff --git a/python/sandcrawler/html_metadata.py b/python/sandcrawler/html_metadata.py
new file mode 100644
index 0000000..1a328ef
--- /dev/null
+++ b/python/sandcrawler/html_metadata.py
@@ -0,0 +1,857 @@
+
+import sys
+import datetime
+from typing import List, Optional, Any, Tuple, Dict
+import urllib.parse
+
+import dateparser
+from selectolax.parser import HTMLParser
+import pydantic
+import braveblock
+
+from sandcrawler.misc import url_fuzzy_equal
+
+
+# this is a map of metadata keys to CSS selectors
+# sources for this list include:
+# - google scholar crawling notes (https://scholar.google.com/intl/ja/scholar/inclusion.html#indexing)
+# - inspection of actual publisher HTML
+# - http://div.div1.com.au/div-thoughts/div-commentaries/66-div-commentary-metadata
+# - "HTML meta tags used by journal articles"
+# https://gist.github.com/hubgit/5985963
+# order of these are mostly by preference/quality (best option first), though
+# also/sometimes re-ordered for lookup efficiency (lookup stops after first
+# match)
+HEAD_META_PATTERNS: Any = {
+ "title": [
+ "meta[name='citation_title']",
+ "meta[name='eprints.title']",
+ "meta[name='prism.title']",
+ "meta[name='bepress_citation_title']",
+ "meta[name='og:title']",
+ "meta[name='dcterms.title']",
+ "meta[name='dc.title']",
+ ],
+ "subtitle": [
+ "meta[name='prism.subtitle']",
+ ],
+ "doi": [
+ "meta[name='citation_doi']",
+ "meta[name='DOI']",
+ "meta[id='DOI']",
+ "meta[name='prism.doi']",
+ "meta[name='bepress_citation_doi']",
+ "meta[name='dc.identifier.doi']",
+ "meta[name='dc.identifier'][scheme='doi']",
+ ],
+ "pmid": [
+ "meta[name='citation_pmid']",
+ ],
+ "abstract": [
+ "meta[name='citation_abstract']",
+ "meta[name='bepress_citation_abstract']",
+ "meta[name='eprints.abstract']",
+ "meta[name='dcterms.abstract']",
+ "meta[name='prism.teaser']",
+ "meta[name='dc.description']",
+ "meta[name='og:description']",
+ ],
+ "container_name": [
+ "meta[name='citation_journal_title']",
+ "meta[name='bepress_citation_journal_title']",
+ "meta[name='citation_conference_title']",
+ "meta[name='bepress_citation_conference_title']",
+ "meta[name='prism.publicationName']",
+ "meta[name='eprints.publication']",
+ "meta[name='dc.relation.ispartof']",
+ "meta[name='dc.source']",
+ "meta[property='og:site_name']",
+ ],
+ "container_abbrev": [
+ "meta[name='citation_journal_abbrev']",
+ ],
+ "raw_date": [
+ "meta[name='citation_publication_date']",
+ "meta[name='bepress_citation_publication_date']",
+ "meta[name='prism.publicationDate']",
+ "meta[name='citation_date']",
+ "meta[name='bepress_citation_date']",
+ "meta[name='citation_online_date']",
+ "meta[name='bepress_citation_online_date']",
+ "meta[itemprop='datePublished']",
+ "meta[name='article:published']",
+ "meta[name='eprints.datestamp']",
+ "meta[name='eprints.date']",
+ "meta[name='dc.date.created']",
+ "meta[name='dc.issued']",
+ "meta[name='dcterms.date']",
+ "meta[name='dc.date']",
+ ],
+ "release_year": [
+ "meta[itemprop='citation_year']",
+ "meta[itemprop='prism:copyrightYear']",
+ ],
+ "first_page": [
+ "meta[name='citation_firstpage']",
+ "meta[name='bepress_citation_firstpage']",
+ "meta[name='prism.startingPage']",
+ "meta[name='dc.citation.spage']",
+ ],
+ "last_page": [
+ "meta[name='citation_lastpage']",
+ "meta[name='bepress_citation_lastpage']",
+ "meta[name='prism.endingPage']",
+ "meta[name='dc.citation.epage']",
+ ],
+ "issue": [
+ "meta[name='citation_issue']",
+ "meta[name='bepress_citation_issue']",
+ "meta[name='prism.issueIdentifier']",
+ "meta[name='dc.citation.issue']",
+ ],
+ "volume": [
+ "meta[name='citation_volume']",
+ "meta[name='bepress_citation_volume']",
+ "meta[name='prism.volume']",
+ "meta[name='dc.citation.volume']",
+ ],
+ "number": [
+ "meta[name='citation_technical_report_number']",
+ "meta[name='bepress_citation_technical_report_number']",
+ "meta[name='citation_number']",
+ "meta[name='bepress_citation_number']",
+ "meta[name='prism.number']",
+ ],
+ "container_issn": [
+ "meta[name='citation_issn']",
+ "meta[name='bepress_citation_issn']",
+ "meta[name='prism.issn']",
+ "meta[name='prism.eIssn']",
+ "meta[name='eprints.issn']",
+ "meta[name='dc.source.issn']",
+ ],
+ "isbn": [
+ "meta[name='citation_isbn']",
+ "meta[name='bepress_citation_isbn']",
+ "meta[name='prism.isbn']",
+ ],
+ "publisher": [
+ "meta[name='citation_publisher']",
+ "meta[name='bepress_citation_publisher']",
+ "meta[name='eprints.publisher']",
+ "meta[name='citation_technical_report_institution']",
+ "meta[name='dcterms.publisher']",
+ "meta[name='dc.publisher']",
+ ],
+ "raw_release_type": [
+ "meta[name='citation_article_type']",
+ "meta[name='bepress_citation_article_type']",
+ "meta[name='prism.contentType']",
+ "meta[name='eprints.type']",
+ "meta[name='dc.type']",
+ ],
+ "lang": [
+ "meta[name='citation_language']",
+ "meta[name='bepress_citation_language']",
+ "meta[name='dcterms.language']",
+ "meta[name='dc.language']",
+ "meta[name='og:locale']",
+ ],
+}
+
+HEAD_META_LIST_PATTERNS: Any = {
+ "contrib_names": [
+ "meta[name='citation_author']",
+ "meta[name='bepress_citation_author']",
+ "meta[name='eprints.creators_name']",
+ "meta[name='dcterms.creator']",
+ "meta[name='article:author']",
+ "meta[name='dc.creator']",
+ "meta[name='dc.contributor']",
+ ],
+ # TODO: citation_author_institution
+ "raw_references": [
+ "meta[name='citation_reference']",
+ ],
+ "raw_identifiers": [
+ "meta[name='eprints.id_number']",
+ "meta[name='dcterms.identifier']",
+ "meta[name='dc.identifier']",
+ ],
+}
+
+XML_FULLTEXT_PATTERNS: List[dict] = [
+ {
+ "selector": "meta[name='citation_xml_url']",
+ "attr": "content",
+ "technique": "citation_xml_url",
+ },
+ {
+ "selector": "meta[name='fulltext_xml']",
+ "attr": "content",
+ "technique": "fulltext_xml",
+ },
+ {
+ "selector": "link[rel='alternate'][type='application/xml']",
+ "attr": "href",
+ "technique": "alternate link",
+ },
+ {
+ "selector": "link[rel='alternate'][type='text/xml']",
+ "attr": "href",
+ "technique": "alternate link",
+ },
+ {
+ "in_doc_url": "scielo",
+ "in_fulltext_url": "articleXML",
+ "selector": "a[target='xml']",
+ "attr": "href",
+ "technique": "SciElo XML link",
+ },
+ {
+ "in_doc_url": "/article/view/",
+ "in_fulltext_url": "viewXML",
+ "selector": "a[class='obj_galley_link']",
+ "attr": "href",
+ "technique": "OJS Gallery XML link",
+ },
+ {
+ "in_fulltext_url": "/download/xml/",
+ "selector": "a[title='XML']",
+ "attr": "href",
+ "technique": "ARPHA XML link",
+ "example_page": "https://zookeys.pensoft.net/article/26391",
+ },
+ {
+ "in_doc_url": "frontiersin.org/",
+ "in_fulltext_url": "xml",
+ "selector": "a.download-files-nlm",
+ "attr": "href",
+ "technique": "XML (NLM) download link (frontiersin.org)",
+ "example_page": "https://www.frontiersin.org/articles/10.3389/fnins.2021.722592/full",
+ },
+]
+
+HTML_FULLTEXT_PATTERNS: List[dict] = [
+ {
+ "selector": "meta[name='citation_fulltext_html_url']",
+ "attr": "content",
+ "technique": "citation_fulltext_html_url",
+ },
+ {
+ "selector": "link[rel='alternate'][type='text/html']",
+ "attr": "href",
+ "technique": "alternate link",
+ },
+ {
+ "in_doc_url": "/article/view/",
+ "in_fulltext_url": "inline=1",
+ "selector": "iframe[name='htmlFrame']",
+ "attr": "src",
+ "technique": "OJS HTML iframe",
+ },
+ {
+ "in_doc_url": "dovepress.com",
+ "in_fulltext_url": "-fulltext-",
+ "selector": "a[id='view-full-text']",
+ "attr": "href",
+ "technique": "dovepress fulltext link",
+ },
+]
+
+COMPONENT_FULLTEXT_PATTERNS: List[dict] = [
+ {
+ "in_doc_url": "pensoft.net/article/", # also /element/
+ "in_fulltext_url": "/download/fig/",
+ "selector": ".Main-Content .figure a.P-Article-Preview-Picture-Download-Small",
+ "attr": "href",
+ "technique": "Active figure download link (zookeys)",
+ "example_page": "https://zookeys.pensoft.net/article/38576/element/2/153/",
+ },
+]
+
+# This is a database of matching patterns. Most of these discovered by hand,
+# looking at OA journal content that failed to craw/ingest.
+PDF_FULLTEXT_PATTERNS: List[dict] = [
+ {
+ "selector": "head meta[name='citation_pdf_url']",
+ "attr": "content",
+ "technique": "citation_pdf_url",
+ },
+ {
+ "selector": "head meta[name='bepress_citation_pdf_url']",
+ "attr": "content",
+ "technique": "citation_pdf_url",
+ },
+ {
+ "in_doc_url": "journals.lww.com",
+ "selector": "head meta[name='wkhealth_pdf_url']",
+ "attr": "content",
+ "technique": "wkhealth_pdf_url",
+ "example_page": "https://journals.lww.com/otainternational/Fulltext/2019/03011/Trauma_systems_in_North_America.2.aspx",
+ },
+ {
+ "selector": "head meta[propery='citation_pdf_url']",
+ "attr": "content",
+ "technique": "citation_pdf_url",
+ # eg, researchgate
+ },
+ {
+ "selector": "head meta[name='eprints.document_url']",
+ "attr": "content",
+ "technique": "citation_pdf_url (property)",
+ },
+ {
+ "in_doc_url": "/doi/10.",
+ "in_fulltext_url": "/doi/pdf/",
+ "selector": "a.show-pdf",
+ "attr": "href",
+ "technique": "SAGE/UTP show-pdflink",
+ "example_page": "https://journals.sagepub.com/doi/10.1177/2309499019888836",
+ # also http://utpjournals.press/doi/10.3138/cjh.ach.54.1-2.05
+ },
+ {
+ "in_doc_url": "/doi/10.",
+ "in_fulltext_url": "/doi/pdf/",
+ "selector": "a[title='PDF']",
+ "attr": "href",
+ "technique": "title=PDF link",
+ "example_page": "https://pubs.acs.org/doi/10.1021/acs.estlett.9b00379",
+ },
+ {
+ "in_doc_url": "/article/view/",
+ "selector": "a#pdfDownloadLink",
+ "attr": "href",
+ "technique": "pdfDownloadLink link",
+ "example_page": "http://www.revistas.unam.mx/index.php/rep/article/view/35503/32336",
+ },
+ {
+ "in_fulltext_url": "/pdf/",
+ "selector": "a.show-pdf",
+ "attr": "href",
+ "technique": "SAGE PDF link",
+ "example_page": "http://journals.sagepub.com/doi/pdf/10.1177/2309499019888836",
+ },
+ {
+ "in_doc_url": "://elifesciences.org/articles/",
+ "in_fulltext_url": "/download/",
+ "selector": "a[data-download-type='pdf-article']",
+ "attr": "href",
+ "technique": "eLife PDF link",
+ "example_page": "https://elifesciences.org/articles/59841",
+ },
+ {
+ "in_doc_url": "://www.jcancer.org/",
+ "in_fulltext_url": ".pdf",
+ "selector": ".divboxright a.text-button",
+ "attr": "href",
+ "technique": "jcancer PDF link",
+ "example_page": "https://www.jcancer.org/v10p4038.htm",
+ },
+ {
+ "in_doc_url": "://www.tandfonline.com/doi/full/10.",
+ "in_fulltext_url": "/pdf/",
+ "selector": "a.show-pdf",
+ "attr": "href",
+ "technique": "t+f show-pdf link",
+ "example_page": "https://www.tandfonline.com/doi/full/10.1080/19491247.2019.1682234",
+ },
+ {
+ "in_doc_url": "article_id=",
+ "in_fulltext_url": "download.php",
+ "selector": "a.file.pdf",
+ "attr": "href",
+ "technique": "pdf file link",
+ "example_page": "http://journals.tsu.ru/psychology/&journal_page=archive&id=1815&article_id=40405",
+ },
+ {
+ "in_doc_url": "/content/10.",
+ "in_fulltext_url": "pdf",
+ "selector": "a.pdf[title='Download']",
+ "attr": "href",
+ "technique": "pdf file link",
+ "example_page": "https://www.eurosurveillance.org/content/10.2807/1560-7917.ES.2020.25.11.2000230",
+ },
+ {
+ "selector": "embed[type='application/pdf']",
+ "attr": "src",
+ "technique": "PDF embed",
+ "example_page": "http://www.jasstudies.com/DergiTamDetay.aspx?ID=3401",
+ },
+ {
+ "in_doc_url": "/html/",
+ "in_fulltext_url": "create_pdf",
+ "selector": ".AbsPdfFigTab img[src='images/pdf-icon.jpg'] + a",
+ "attr": "href",
+ "technique": "PDF URL link",
+ "example_page": "http://www.aed.org.cn/nyzyyhjxb/html/2018/4/20180408.htm",
+ },
+ {
+ "in_doc_url": "/archive-detail/",
+ "in_fulltext_url": ".pdf",
+ "selector": ".contact-list a.download-pdf",
+ "attr": "href",
+ "technique": "PDF URL link",
+ "example_page": "http://www.bezmialemscience.org/archives/archive-detail/article-preview/editorial/20439",
+ },
+ {
+ "in_doc_url": "degruyter.com/document/",
+ "in_fulltext_url": "/pdf",
+ "selector": "a.downloadPdf",
+ "attr": "href",
+ "technique": "PDF URL link",
+ "example_page": "https://www.degruyter.com/document/doi/10.1515/zaw-2021-0001/html",
+ },
+ {
+ "in_doc_url": "repositorio.unicamp.br/handle/",
+ "in_fulltext_url": "/bitstream/",
+ "selector": "table.panel-body a[target='_blank']",
+ "attr": "href",
+ "technique": "PDF URL link",
+ "example_page": "http://www.repositorio.unicamp.br/handle/REPOSIP/287750",
+ },
+ {
+ "in_doc_url": "dlc.library.columbia.edu/durst/",
+ "selector": "dd.blacklight-lib_non_item_in_context_url_ssm a[href]",
+ "attr": "href",
+ "technique": "Access URL link",
+ "example_page": "https://dlc.library.columbia.edu/durst/cul:18931zcrk9",
+ },
+ {
+ "in_doc_url": "fldeploc.dep.state.fl.us/geodb_query/fgs_doi",
+ "in_fulltext_url": "pdf",
+ "selector": "p a[href]",
+ "attr": "href",
+ "technique": "PDF URL link",
+ "example_page": "http://fldeploc.dep.state.fl.us/geodb_query/fgs_doi.asp?searchCode=IC29",
+ },
+ {
+ "in_doc_url": "preprints.jmir.org/preprint/",
+ "selector": "a.pdf-download-button",
+ "attr": "href",
+ "technique": "PDF URL link",
+ "example_page": "https://preprints.jmir.org/preprint/22556",
+ },
+ {
+ "in_doc_url": "bloomsburycollections.com/",
+ "in_fulltext_url": "pdf",
+ "selector": "li.download-item a[href]",
+ "attr": "href",
+ "technique": "PDF URL link",
+ "example_page": "https://www.bloomsburycollections.com/book/the-political-economies-of-media-the-transformation-of-the-global-media-industries/the-political-economies-of-media-and-the-transformation-of-the-global-media-industries",
+ },
+ {
+ "in_doc_url": "emerald.com/insight/content/",
+ "in_fulltext_url": "pdf",
+ "selector": "a.intent_pdf_link",
+ "attr": "href",
+ "technique": "PDF URL link",
+ "example_page": "https://www.emerald.com/insight/content/doi/10.1108/RAMJ-11-2020-0065/full/html",
+ },
+ {
+ "in_doc_url": "ingentaconnect.com/content/",
+ "in_fulltext_url": "pdf",
+ "selector": "a.pdf[data-popup]",
+ "attr": "data-popup",
+ "technique": "PDF URL link",
+ "example_page": "https://www.ingentaconnect.com/content/ista/sst/2021/00000049/00000001/art00007",
+ },
+ {
+ "in_doc_url": "library.wur.nl/",
+ "in_fulltext_url": "pdf",
+ "selector": "a.wl_full_text_restricted",
+ "attr": "href",
+ "technique": "PDF URL link",
+ "example_page": "https://library.wur.nl/WebQuery/wurpubs/529922",
+ },
+ {
+ "in_doc_url": "/dlibra/",
+ "in_fulltext_url": "pdf",
+ "selector": "iframe#js-main-frame",
+ "attr": "src",
+ "technique": "PDF iframe (dlibra)",
+ "example_page": "https://dbc.wroc.pl/dlibra/docmetadata?showContent=true&id=41031",
+ },
+ {
+ "in_doc_url": "/handle/",
+ "in_fulltext_url": "pdf",
+ "selector": "table.misc table.inner tr.b a",
+ "attr": "href",
+ "technique": "PDF URL link (DSpace, first file)",
+ "example_page": "https://orbi.uliege.be/handle/2268/174200",
+ },
+ {
+ "in_doc_url": "/publications/",
+ "in_fulltext_url": "pdf",
+ "selector": ".publication-sidebar li.open-access a.document-link",
+ "attr": "href",
+ "technique": "PDF URL link (Pure repo, OA link)",
+ "example_page": "https://research.tue.nl/en/publications/lowering-the-threshold-for-computers-in-early-design-some-advance",
+ },
+ {
+ "in_doc_url": "//hal",
+ "selector": ".widget-openaccess .widget-content a",
+ "attr": "href",
+ "technique": "Fulltext OA URL (HAL)",
+ "example_page": "https://hal.archives-ouvertes.fr/hal-00744951",
+ },
+ {
+ "in_doc_url": "/record/",
+ "in_fulltext_url": "pdf",
+ "selector": "#detailedrecordminipanelfile a",
+ "attr": "href",
+ "technique": "PDF URL link (Invenio)",
+ "example_page": "https://bib-pubdb1.desy.de/record/416556",
+ },
+ {
+ "in_doc_url": "/available/",
+ "in_fulltext_url": "pdf",
+ "selector": "table.file-table a",
+ "attr": "href",
+ "technique": "PDF URL link",
+ "example_page": "https://etd.adm.unipi.it/theses/available/etd-05302014-183910/",
+ },
+ {
+ "in_doc_url": "/islandora/",
+ "in_fulltext_url": "pdf",
+ "selector": "a.islandora-pdf-link",
+ "attr": "href",
+ "technique": "PDF URL link (Islandora)",
+ "example_page": "http://fau.digital.flvc.org/islandora/object/fau%3A9804",
+ },
+ {
+ "in_doc_url": "/receive/",
+ "in_fulltext_url": "pdf",
+ "selector": ".mir-preview noscript a",
+ "attr": "href",
+ "technique": "PDF iframe via noscript (MyCoRe)",
+ "example_page": "https://www.db-thueringen.de/receive/dbt_mods_00005191",
+ },
+ {
+ "in_doc_url": "/registro.do",
+ "in_fulltext_url": "imagenes",
+ "selector": ".resumen_bib a[data-analytics=media]",
+ "attr": "href",
+ "technique": "Media link (DIGIBIS)",
+ "example_page": "https://bivaldi.gva.es/es/consulta/registro.do?id=11740",
+ },
+ {
+ "in_doc_url": "/view",
+ "in_fulltext_url": "/at_download/",
+ "selector": ".documentContent #content a",
+ "attr": "href",
+ "technique": "Media link (Plone)",
+ "example_page": "http://xjornadaslc.fahce.unlp.edu.ar/actas/Ramon_Esteban_Chaparro.pdf/view",
+ },
+ {
+ "in_doc_url": "isca-speech.org/",
+ "in_fulltext_url": "pdf",
+ "selector": ".w3-container a",
+ "attr": "href",
+ "technique": "PDF URL link (isca-speech.org)",
+ "example_page": "https://www.isca-speech.org/archive/interspeech_2006/chitturi06b_interspeech.html",
+ },
+ {
+ "in_doc_url": "://repository.dri.ie/",
+ "in_fulltext_url": "/download",
+ "selector": "#dri_download_assets > div > a",
+ "attr": "href",
+ "technique": "Download link (repository.dri.ie)",
+ "example_page": "https://repository.dri.ie/catalog/qf8621102",
+ },
+ {
+ "in_doc_url": "frontiersin.org/",
+ "in_fulltext_url": "pdf",
+ "selector": "a.download-files-pdf",
+ "attr": "href",
+ "technique": "PDF Download link (frontiersin.org)",
+ "example_page": "https://www.frontiersin.org/articles/10.3389/fnins.2021.722592/full",
+ },
+ {
+ "in_doc_url": "cureus.com/",
+ "in_fulltext_url": "pdf",
+ "selector": ".small-medium-pdf a.pdf-download-button",
+ "attr": "href",
+ "technique": "PDF Download link (cureus.com)",
+ "example_page": "https://www.cureus.com/articles/69542-tramadol-induced-jerks",
+ },
+ {
+ "in_doc_url": "e-manuscripta.ch/",
+ "in_fulltext_url": "pdf",
+ "selector": "#titleinfoPdfDownload a.resourceLink",
+ "attr": "href",
+ "technique": "PDF Download link (e-manuscripta.ch)",
+ "example_page": "https://www.e-manuscripta.ch/zut/doi/10.7891/e-manuscripta-112176",
+ },
+]
+
+FULLTEXT_URL_PATTERNS_SKIP = [
+ # wiley has a weird almost-blank page we don't want to loop on
+ "://onlinelibrary.wiley.com/doi/pdf/"
+ "://doi.org/"
+ "://dx.doi.org/"
+]
+
+RELEASE_TYPE_MAP = {
+ "research article": "article-journal",
+ "text.serial.journal": "article-journal",
+}
+
+
+class BiblioMetadata(pydantic.BaseModel):
+ title: Optional[str]
+ subtitle: Optional[str]
+ contrib_names: Optional[List[str]]
+ release_date: Optional[datetime.date]
+ release_year: Optional[int]
+ release_type: Optional[str]
+ release_stage: Optional[str]
+ withdrawn_status: Optional[str]
+ lang: Optional[str]
+ country_code: Optional[str]
+ volume: Optional[str]
+ issue: Optional[str]
+ number: Optional[str]
+ pages: Optional[str]
+ first_page: Optional[str]
+ last_page: Optional[str]
+ license: Optional[str]
+ publisher: Optional[str]
+ container_name: Optional[str]
+ container_abbrev: Optional[str]
+ container_issn: Optional[str]
+ container_type: Optional[str]
+ raw_references: Optional[List[str]]
+
+ doi: Optional[str]
+ pmid: Optional[str]
+ pmcid: Optional[str]
+ isbn13: Optional[str]
+ publisher_ident: Optional[str]
+ oai_id: Optional[str]
+
+ abstract: Optional[str]
+ pdf_fulltext_url: Optional[str]
+ html_fulltext_url: Optional[str]
+ xml_fulltext_url: Optional[str]
+ component_url: Optional[str]
+
+ class Config:
+ json_encoders = {
+ datetime.date: lambda dt: dt.isoformat()
+ }
+
+
+def html_extract_fulltext_url(doc_url: str, doc: HTMLParser, patterns: List[dict]) -> Optional[Tuple[str, str]]:
+ """
+ Tries to quickly extract fulltext URLs using a set of patterns. This
+ function is intendend to be generic across various extraction techniques.
+
+ Returns null or a tuple of (url, technique)
+ """
+ self_doc_url: Optional[Tuple[str, str]] = None
+ for pattern in patterns:
+ if not 'selector' in pattern:
+ continue
+ if 'in_doc_url' in pattern:
+ if not pattern['in_doc_url'] in doc_url:
+ continue
+ elem = doc.css_first(pattern['selector'])
+ if not elem:
+ continue
+ if 'attr' in pattern:
+ val = elem.attrs.get(pattern['attr'])
+ if not val:
+ continue
+ val = urllib.parse.urljoin(doc_url, val)
+ assert val
+ if 'in_fulltext_url' in pattern:
+ if not pattern['in_fulltext_url'] in val:
+ continue
+ for skip_pattern in FULLTEXT_URL_PATTERNS_SKIP:
+ if skip_pattern in val.lower():
+ continue
+ if url_fuzzy_equal(doc_url, val):
+ # don't link to self, unless no other options
+ self_doc_url = (val, pattern.get('technique', 'unknown'))
+ continue
+ return (val, pattern.get('technique', 'unknown'))
+ if self_doc_url:
+ print(f" WARN: returning fulltext URL pointing to self", file=sys.stderr)
+ return self_doc_url
+ return None
+
+def html_extract_biblio(doc_url: str, doc: HTMLParser) -> Optional[BiblioMetadata]:
+
+ meta: Any = dict()
+ head = doc.css_first("head")
+ if not head:
+ return None
+
+ for field, patterns in HEAD_META_PATTERNS.items():
+ for pattern in patterns:
+ val = head.css_first(pattern)
+ #print((field, pattern, val))
+ if val and 'content' in val.attrs and val.attrs['content']:
+ meta[field] = val.attrs['content']
+ break
+
+ for field, patterns in HEAD_META_LIST_PATTERNS.items():
+ for pattern in patterns:
+ val_list = head.css(pattern)
+ if val_list:
+ for val in val_list:
+ if 'content' in val.attrs and val.attrs['content']:
+ if not field in meta:
+ meta[field] = []
+ meta[field].append(val.attrs['content'])
+ break
+
+ # (some) fulltext extractions
+ pdf_fulltext_url = html_extract_fulltext_url(doc_url, doc, PDF_FULLTEXT_PATTERNS)
+ if pdf_fulltext_url:
+ meta['pdf_fulltext_url'] = pdf_fulltext_url[0]
+ xml_fulltext_url = html_extract_fulltext_url(doc_url, doc, XML_FULLTEXT_PATTERNS)
+ if xml_fulltext_url:
+ meta['xml_fulltext_url'] = xml_fulltext_url[0]
+ html_fulltext_url = html_extract_fulltext_url(doc_url, doc, HTML_FULLTEXT_PATTERNS)
+ if html_fulltext_url:
+ meta['html_fulltext_url'] = html_fulltext_url[0]
+ component_url = html_extract_fulltext_url(doc_url, doc, COMPONENT_FULLTEXT_PATTERNS)
+ if component_url:
+ meta['component_url'] = component_url[0]
+
+ # TODO: replace with clean_doi() et al
+ if meta.get('doi') and meta.get('doi').startswith('doi:'):
+ meta['doi'] = meta['doi'][4:]
+
+ raw_identifiers = meta.pop('raw_identifiers', [])
+ for ident in raw_identifiers:
+ if ident.startswith('doi:10.'):
+ if not 'doi' in meta:
+ meta['doi'] = ident.replace('doi:', '')
+ elif ident.startswith('10.') and '/' in ident:
+ if not 'doi' in meta:
+ meta['doi'] = ident
+ elif ident.startswith('isbn:'):
+ if not 'isbn' in meta:
+ meta['isbn'] = ident.replace('isbn:', '')
+
+ raw_date = meta.pop('raw_date', None)
+ if raw_date:
+ parsed = dateparser.parse(raw_date)
+ if parsed:
+ meta['release_date'] = parsed.date()
+
+ raw_release_type = meta.pop('raw_release_type', None)
+ if raw_release_type:
+ release_type = RELEASE_TYPE_MAP.get(raw_release_type.lower().strip())
+ if release_type:
+ meta['release_type'] = release_type
+
+ return BiblioMetadata(**meta)
+
+def load_adblock_rules() -> braveblock.Adblocker:
+ """
+ TODO: consider blocking very generic assets:
+ - ://fonts.googleapis.com/css*
+ - ://journals.plos.org/plosone/resource/img/icon.*
+ """
+ return braveblock.Adblocker(
+ include_easylist=True,
+ include_easyprivacy=True,
+ rules=[
+ "/favicon.ico^",
+ "||fonts.googleapis.com^",
+ "||widgets.figshare.com^",
+ "||crossmark-cdn.crossref.org^",
+ "||crossmark.crossref.org^",
+ "||platform.twitter.com^",
+ "||verify.nature.com^",
+ "||s7.addthis.com^",
+ "||www.mendeley.com^",
+ "||pbs.twimg.com^",
+ "||badge.dimensions.ai^",
+ "||recaptcha.net^",
+
+ # not sure about these CC badges (usually via a redirect)
+ #"||licensebuttons.net^",
+ #"||i.creativecommons.org^",
+
+ # Should we skip jquery, or other generic javascript CDNs?
+ #"||code.jquery.com^",
+ #"||ajax.googleapis.com^",
+ #"||cdnjs.cloudflare.com^",
+
+ # badges, "share" buttons, tracking, etc
+ "apis.google.com/js/plusone",
+ "www.google.com/recaptcha/",
+ "js/_getUACode.js"
+
+ # PLOS images
+ "/resource/img/icon.*.16.png^",
+ ],
+ )
+
+
+def _extract_generic(doc: HTMLParser, selector: str, attrs: List[str], type_name: str) -> list:
+ resources = []
+
+ for node in doc.css(selector):
+ for attr in attrs:
+ if not attr in node.attrs:
+ continue
+ url = node.attrs.get(attr)
+ # special-case a couple meta URI prefixes which don't match with adblock rules
+ skip = False
+ for prefix in ['about:', 'data:', 'magnet:', 'urn:', 'mailto:']:
+ if url and url.startswith(prefix):
+ skip = True
+ break
+ if skip:
+ continue
+ if url:
+ #print(url, file=sys.stderr)
+ resources.append(dict(url=url.strip(), type=type_name))
+
+ return resources
+
+
+def html_extract_resources(doc_url: str, doc: HTMLParser, adblock: braveblock.Adblocker) -> list:
+ """
+ This function tries to find all the important resources in a page. The
+ presumption is that the HTML document is article fulltext, and we want the
+ list of all resoures (by URL) necessary to replay the page.
+
+ The returned resource URLs each have a type (script, img, css, etc), and
+ should be fully-qualified URLs (not relative).
+
+ Adblock filtering is run to remove unwanted resources.
+ """
+ resources = []
+
+ # select various resource references
+ resources += _extract_generic(doc, "script", ["src"], "script")
+ resources += _extract_generic(doc, "link[rel='stylesheet']", ["href"], "stylesheet")
+ # TODO: srcset and parse
+ # eg: https://dfzljdn9uc3pi.cloudfront.net/2018/4375/1/fig-5-2x.jpg 1200w, https://dfzljdn9uc3pi.cloudfront.net/2018/4375/1/fig-5-1x.jpg 600w, https://dfzljdn9uc3pi.cloudfront.net/2018/4375/1/fig-5-small.jpg 355w
+ resources += _extract_generic(doc, "img", ["src"], "image")
+ resources += _extract_generic(doc, "audio", ["src"], "audio")
+ resources += _extract_generic(doc, "video", ["src"], "media")
+ resources += _extract_generic(doc, "source", ["src"], "media")
+ resources += _extract_generic(doc, "track", ["src"], "media")
+ resources += _extract_generic(doc, "iframe", ["src"], "subdocument")
+ resources += _extract_generic(doc, "embed", ["src"], "media")
+
+ # ensure URLs are absolute
+ for r in resources:
+ r['url'] = urllib.parse.urljoin(doc_url, r['url'])
+
+ # filter using adblocker
+ resources = [r for r in resources if adblock.check_network_urls(r['url'], source_url=doc_url, request_type=r['type']) == False]
+
+ # remove duplicates
+ resources = [dict(t) for t in {tuple(d.items()) for d in resources}]
+
+ return resources
+
diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py
new file mode 100644
index 0000000..c586972
--- /dev/null
+++ b/python/sandcrawler/ia.py
@@ -0,0 +1,1138 @@
+
+# XXX: some broken MRO thing going on in here due to python3 object wrangling
+# in `wayback` library. Means we can't run pylint.
+# pylint: skip-file
+
+import os
+import sys
+import time
+import gzip
+import json
+import requests
+import datetime
+import urllib.parse
+import urllib3.exceptions
+from typing import Tuple
+from collections import namedtuple
+
+import http.client
+
+# not sure this will really work. Should go before wayback imports.
+http.client._MAXHEADERS = 1000 # type: ignore
+
+import wayback.exception
+from http.client import IncompleteRead
+from wayback.resourcestore import ResourceStore
+from gwb.loader import CDXLoaderFactory3
+
+from .misc import b32_hex, requests_retry_session, gen_file_metadata, clean_url
+
+class SandcrawlerBackoffError(Exception):
+ """
+ A set of Exceptions which are raised through multiple abstraction layers to
+ indicate backpressure. For example, SPNv2 back-pressure sometimes needs to
+ be passed up through any timeout/retry code and become an actual long pause
+ or crash.
+ """
+ pass
+
+ResourceResult = namedtuple("ResourceResult", [
+ "start_url",
+ "hit",
+ "status",
+ "terminal_url",
+ "terminal_dt",
+ "terminal_status_code",
+ "body",
+ "cdx",
+ "revisit_cdx",
+])
+
+WarcResource = namedtuple("WarcResource", [
+ "status_code",
+ "location",
+ "body",
+ "revisit_cdx",
+])
+
+CdxRow = namedtuple('CdxRow', [
+ 'surt',
+ 'datetime',
+ 'url',
+ 'mimetype',
+ 'status_code',
+ 'sha1b32',
+ 'sha1hex',
+ 'warc_csize',
+ 'warc_offset',
+ 'warc_path',
+])
+
+CdxPartial = namedtuple('CdxPartial', [
+ 'surt',
+ 'datetime',
+ 'url',
+ 'mimetype',
+ 'status_code',
+ 'sha1b32',
+ 'sha1hex',
+])
+
+def cdx_partial_from_row(full):
+ return CdxPartial(
+ surt=full.surt,
+ datetime=full.datetime,
+ url=full.url,
+ mimetype=full.mimetype,
+ status_code=full.status_code,
+ sha1b32=full.sha1b32,
+ sha1hex=full.sha1hex,
+ )
+
+def cdx_to_dict(cdx):
+ d = {
+ "surt": cdx.surt,
+ "datetime": cdx.datetime,
+ "url": cdx.url,
+ "mimetype": cdx.mimetype,
+ "status_code": cdx.status_code,
+ "sha1b32": cdx.sha1b32,
+ "sha1hex": cdx.sha1hex,
+ }
+ if type(cdx) == CdxRow and '/' in cdx.warc_path:
+ d['warc_csize'] = cdx.warc_csize
+ d['warc_offset'] = cdx.warc_offset
+ d['warc_path'] = cdx.warc_path
+ return d
+
+def fuzzy_match_url(left, right):
+ """
+ Matches URLs agnostic of http/https (and maybe other normalizations in the
+ future)
+ """
+ if left == right:
+ return True
+ if '://' in left and '://' in right:
+ left = '://'.join(left.split('://')[1:])
+ right = '://'.join(right.split('://')[1:])
+ if left == right:
+ return True
+ if left == right + "/" or right == left + "/":
+ return True
+ return False
+
+def test_fuzzy_match_url():
+ assert fuzzy_match_url("http://thing.com", "http://thing.com") == True
+ assert fuzzy_match_url("http://thing.com", "https://thing.com") == True
+ assert fuzzy_match_url("http://thing.com", "ftp://thing.com") == True
+ assert fuzzy_match_url("http://thing.com", "http://thing.com/") == True
+ assert fuzzy_match_url("https://thing.com", "http://thing.com/") == True
+ assert fuzzy_match_url("https://thing.com/", "http://thing.com") == True
+ assert fuzzy_match_url("http://thing.com", "http://thing.com/blue") == False
+
+ # should probably handle these?
+ assert fuzzy_match_url("http://thing.com", "http://www.thing.com") == False
+ assert fuzzy_match_url("http://www.thing.com", "http://www2.thing.com") == False
+ assert fuzzy_match_url("http://www.thing.com", "https://www2.thing.com") == False
+
+class CdxApiError(Exception):
+ pass
+
+class CdxApiClient:
+
+ def __init__(self, host_url="https://web.archive.org/cdx/search/cdx", **kwargs):
+ self.host_url = host_url
+ self.http_session = requests_retry_session(retries=3, backoff_factor=3)
+ cdx_auth_token = kwargs.get('cdx_auth_token',
+ os.environ.get('CDX_AUTH_TOKEN'))
+ if not cdx_auth_token:
+ raise Exception("CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)")
+ self.http_session.headers.update({
+ 'User-Agent': 'Mozilla/5.0 sandcrawler.CdxApiClient',
+ 'Cookie': 'cdx_auth_token={}'.format(cdx_auth_token),
+ })
+
+ def _query_api(self, params):
+ """
+ Hits CDX API with a query, parses result into a list of CdxRow
+ """
+ resp = self.http_session.get(self.host_url, params=params)
+ if resp.status_code != 200:
+ raise CdxApiError(resp.text)
+ #print(resp.url, file=sys.stderr)
+ if not resp.text:
+ return None
+ rj = resp.json()
+ if len(rj) <= 1:
+ return None
+ rows = []
+ for raw in rj[1:]:
+ # check number of CDX fields; there is a bug with some rows having
+ # spaces in WARC filename resulting in extra bogus fields
+ if len(raw) != 11:
+ raise CdxApiError(f"CDX response had {len(raw)} fields, not 11 expected")
+
+ # transform "-" ftp status code to a 226
+ status_code = None
+ if raw[4] == "-":
+ if raw[3] != "warc/revisit" and raw[2].startswith("ftp://"):
+ status_code = 226
+ else:
+ status_code = int(raw[4])
+
+ # CDX rows with no WARC records?
+ if raw[8] == '-' or raw[9] == '-' or raw[10] == '-':
+ continue
+
+ row = CdxRow(
+ surt=raw[0],
+ datetime=raw[1],
+ url=raw[2],
+ mimetype=raw[3],
+ status_code=status_code,
+ sha1b32=raw[5],
+ sha1hex=b32_hex(raw[5]),
+ warc_csize=int(raw[8]),
+ warc_offset=int(raw[9]),
+ warc_path=raw[10],
+ )
+ assert (row.mimetype == "-") or ("-" not in row)
+ rows.append(row)
+ return rows
+
+ def fetch(self, url, datetime, filter_status_code=None, retry_sleep=None):
+ """
+ Fetches a single CDX row by url/datetime. Raises a KeyError if not
+ found, because we expect to be looking up a specific full record.
+ """
+ if len(datetime) != 14:
+ raise ValueError("CDX fetch requires full 14 digit timestamp. Got: {}".format(datetime))
+ params = {
+ 'url': url,
+ 'from': datetime,
+ 'to': datetime,
+ 'matchType': 'exact',
+ 'limit': 1,
+ 'output': 'json',
+ }
+ if filter_status_code:
+ params['filter'] = "statuscode:{}".format(filter_status_code)
+ resp = self._query_api(params)
+ if not resp:
+ if retry_sleep and retry_sleep > 0:
+ next_sleep = None
+ if retry_sleep > 3:
+ next_sleep = retry_sleep - 3
+ retry_sleep = 3
+ print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), file=sys.stderr)
+ time.sleep(retry_sleep)
+ return self.fetch(url, datetime, filter_status_code=filter_status_code, retry_sleep=next_sleep)
+ raise KeyError("CDX url/datetime not found: {} {}".format(url, datetime))
+ row = resp[0]
+ # allow fuzzy http/https match
+ if not (fuzzy_match_url(row.url, url) and row.datetime == datetime):
+ if retry_sleep and retry_sleep > 0:
+ print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), file=sys.stderr)
+ time.sleep(retry_sleep)
+ return self.fetch(url, datetime, filter_status_code=filter_status_code, retry_sleep=None)
+ raise KeyError("Didn't get exact CDX url/datetime match. url:{} dt:{} got:{}".format(url, datetime, row))
+ if filter_status_code:
+ assert row.status_code == filter_status_code
+ return row
+
+ def lookup_best(self, url, max_age_days=None, best_mimetype=None, closest=None):
+ """
+ Fetches multiple CDX rows for the given URL, tries to find the most recent.
+
+ If no matching row is found, return None. Note this is different from fetch.
+
+ Preference order by status code looks like:
+
+ 200 or 226
+ mimetype match
+ not-liveweb
+ most-recent
+ no match
+ not-liveweb
+ most-recent
+ 3xx
+ most-recent
+ 4xx
+ most-recent
+ 5xx
+ most-recent
+
+ """
+ params = {
+ 'url': url,
+ 'matchType': 'exact',
+ 'limit': -25,
+ 'output': 'json',
+ # Collapsing seems efficient, but is complex; would need to include
+ # other filters and status code in filter
+ #'collapse': 'timestamp:6',
+
+ # Revisits now allowed and resolved!
+ #'filter': '!mimetype:warc/revisit',
+ }
+ if max_age_days:
+ since = datetime.date.today() - datetime.timedelta(days=max_age_days)
+ params['from'] = '%04d%02d%02d' % (since.year, since.month, since.day),
+ if closest:
+ params['closest'] = closest
+ params['sort'] = "closest"
+ #print(params, file=sys.stderr)
+ rows = self._query_api(params)
+ if not rows:
+ return None
+
+ def _cdx_sort_key(r):
+ """
+ This is a function, not a lambda, because it captures
+ best_mimetype. Will create a tuple that can be used to sort in
+ *reverse* order.
+ """
+ return (
+ int(r.status_code in (200, 226)),
+ int(0 - (r.status_code or 999)),
+ int(r.mimetype == best_mimetype),
+ int(r.mimetype != "warc/revisit"),
+ int(r.datetime[:6]),
+ int('/' in r.warc_path),
+ int(r.datetime),
+ )
+
+ rows = sorted(rows, key=_cdx_sort_key)
+ return rows[-1]
+
+
+class WaybackError(Exception):
+ pass
+
+class WaybackContentError(Exception):
+ pass
+
+class PetaboxError(Exception):
+ pass
+
+class NoCaptureError(Exception):
+ pass
+
+class WaybackClient:
+
+ def __init__(self, cdx_client=None, **kwargs):
+ if cdx_client:
+ self.cdx_client = cdx_client
+ else:
+ self.cdx_client = CdxApiClient()
+ # /serve/ instead of /download/ doesn't record view count
+ # this *does* want to be http://, not https://
+ self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/')
+ # gwb library will fall back to reading from /opt/.petabox/webdata.secret
+ self.petabox_webdata_secret = kwargs.get(
+ 'petabox_webdata_secret',
+ os.environ.get('PETABOX_WEBDATA_SECRET'),
+ )
+ self.warc_uri_prefix = kwargs.get('warc_uri_prefix', 'https://archive.org/serve/')
+ self.rstore = None
+ self.max_redirects = 25
+ self.wayback_endpoint = "https://web.archive.org/web/"
+ self.replay_headers = {
+ 'User-Agent': 'Mozilla/5.0 sandcrawler.WaybackClient',
+ }
+
+ def fetch_petabox(self, csize, offset, warc_path, resolve_revisit=True):
+ """
+ Fetches wayback resource directly from petabox using WARC path/offset/csize.
+
+ If there is a problem with petabox, raises a PetaboxError.
+ If resource doesn't exist, would raise a KeyError (TODO).
+
+ The body is only returned if the record is success (HTTP 200 or
+ equivalent). Otherwise only the status and header info is returned.
+
+ WarcResource object (namedtuple) contains fields:
+ - status_code: int
+ - location: eg, for redirects
+ - body: raw bytes
+
+ resolve_revist does what it sounds like: tries following a revisit
+ record by looking up CDX API and then another fetch. Refuses to recurse
+ more than one hop (eg, won't follow a chain of revisits).
+
+ Requires (and uses) a secret token.
+ """
+ if not self.petabox_webdata_secret:
+ raise Exception("WaybackClient needs petabox secret to do direct WARC fetches")
+ if not "/" in warc_path:
+ raise ValueError("what looks like a liveweb/SPN temporary warc path: {}".format(warc_path))
+ warc_uri = self.warc_uri_prefix + warc_path
+ if not self.rstore:
+ self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory3(
+ webdata_secret=self.petabox_webdata_secret,
+ ))
+ try:
+ #print("offset: {} csize: {} uri: {}".format(offset, csize, warc_uri), file=sys.stderr)
+ gwb_record = self.rstore.load_resource(warc_uri, offset, csize)
+ except wayback.exception.ResourceUnavailable:
+ print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr)
+ raise PetaboxError("failed to load file contents from wayback/petabox (ResourceUnavailable)")
+ except wayback.exception.InvalidResource:
+ print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr)
+ raise WaybackContentError("failed to load file contents from wayback/petabox (InvalidResource)")
+ except urllib3.exceptions.ReadTimeoutError as rte:
+ raise PetaboxError("failed to load file contents from wayback/petabox (ReadTimeoutError: {})".format(rte))
+ except ValueError as ve:
+ raise PetaboxError("failed to load file contents from wayback/petabox (ValueError: {})".format(ve))
+ except EOFError as eofe:
+ raise PetaboxError("failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
+ except TypeError as te:
+ raise PetaboxError("failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te))
+ except Exception as e:
+ if "while decompressing data: invalid block type" in str(e):
+ raise PetaboxError("decompression error fetching WARC record; usually due to bad alexa ARC files")
+ else:
+ raise e
+ # Note: could consider a generic "except Exception" here, as we get so
+ # many petabox errors. Do want jobs to fail loud and clear when the
+ # whole cluster is down though.
+
+ try:
+ status_code = gwb_record.get_status()[0]
+ except http.client.HTTPException:
+ raise WaybackContentError("too many HTTP headers (in wayback fetch)")
+ location = gwb_record.get_location() or None
+
+ if status_code is None and gwb_record.target_uri.startswith(b"ftp://") and not gwb_record.is_revisit():
+ # TODO: some additional verification here?
+ status_code = 226
+
+ body = None
+ revisit_cdx = None
+ if gwb_record.is_revisit():
+ if not resolve_revisit:
+ raise WaybackContentError("found revisit record, but won't resolve (loop?)")
+ revisit_uri, revisit_dt = gwb_record.refers_to
+ if not (revisit_uri and revisit_dt):
+ raise WaybackContentError("revisit record missing URI and/or DT: warc:{} offset:{}".format(
+ warc_path, offset))
+ # convert revisit_dt
+ # len("2018-07-24T11:56:49"), or with "Z"
+ assert len(revisit_dt) in (19, 20)
+ if type(revisit_uri) is bytes:
+ revisit_uri = revisit_uri.decode('utf-8')
+ if type(revisit_dt) is bytes:
+ revisit_dt = revisit_dt.decode('utf-8')
+ revisit_dt = revisit_dt.replace('-', '').replace(':', '').replace('T', '').replace('Z', '')
+ assert len(revisit_dt) == 14
+ try:
+ revisit_cdx = self.cdx_client.fetch(revisit_uri, revisit_dt)
+ body = self.fetch_petabox_body(
+ csize=revisit_cdx.warc_csize,
+ offset=revisit_cdx.warc_offset,
+ warc_path=revisit_cdx.warc_path,
+ resolve_revisit=False,
+ expected_status_code=revisit_cdx.status_code,
+ )
+ except KeyError as ke:
+ raise WaybackError("Revist resolution failed: {}".format(ke))
+ elif status_code in (200, 226):
+ try:
+ body = gwb_record.open_raw_content().read()
+ except IncompleteRead as ire:
+ raise WaybackError(
+ "failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire))
+ elif status_code is None:
+ raise WaybackContentError(
+ "got a None status_code in (W)ARC record")
+ return WarcResource(
+ status_code=status_code,
+ location=location,
+ body=body,
+ revisit_cdx=revisit_cdx,
+ )
+
+ def fetch_petabox_body(self, csize, offset, warc_path, resolve_revisit=True, expected_status_code=None):
+ """
+ Fetches HTTP 200 WARC resource directly from petabox using WARC path/offset/csize.
+
+ Returns bytes. Raises KeyError if resource wasn't an HTTP 200.
+
+ Thin helper around fetch_petabox()
+ """
+ resource = self.fetch_petabox(
+ csize=csize,
+ offset=offset,
+ warc_path=warc_path,
+ resolve_revisit=resolve_revisit,
+ )
+
+ if expected_status_code:
+ if expected_status_code != resource.status_code:
+ raise KeyError("archived HTTP response (WARC) was not {}: {}".format(
+ expected_status_code,
+ resource.status_code,
+ )
+ )
+ elif resource.status_code not in (200, 226):
+ raise KeyError("archived HTTP response (WARC) was not 200: {}".format(
+ resource.status_code)
+ )
+
+ return resource.body
+
+ def fetch_replay_body(self, url, datetime, cdx_sha1hex=None):
+ """
+ Fetches an HTTP 200 record from wayback via the replay interface
+ (web.archive.org) instead of petabox.
+
+ Intended for use with SPN2 requests, where request body has not ended
+ up in petabox yet.
+
+ If cdx_sha1hex is passed, will try to verify fetched body. Note that
+ this check *won't work* in many cases, due to CDX hash being of
+ compressed transfer data, not the uncompressed final content bytes.
+
+ TODO: could instead try to verify that we got the expected replay body
+ using... new X-Archive headers?
+ """
+
+ # defensively check datetime format
+ assert len(datetime) == 14
+ assert datetime.isdigit()
+
+ try:
+ resp = requests.get(
+ self.wayback_endpoint + datetime + "id_/" + url,
+ allow_redirects=False,
+ headers=self.replay_headers,
+ )
+ except requests.exceptions.TooManyRedirects:
+ raise WaybackContentError("redirect loop (wayback replay fetch)")
+ except requests.exceptions.ChunkedEncodingError:
+ raise WaybackError("ChunkedEncodingError (wayback replay fetch)")
+ except UnicodeDecodeError:
+ raise WaybackContentError("UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(url))
+
+ try:
+ resp.raise_for_status()
+ except Exception as e:
+ raise WaybackError(str(e))
+ #print(resp.url, file=sys.stderr)
+
+ # defensively check that this is actually correct replay based on headers
+ if not "X-Archive-Src" in resp.headers:
+ raise WaybackError("replay fetch didn't return X-Archive-Src in headers")
+ if not datetime in resp.url:
+ raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(datetime, resp.url))
+
+ if cdx_sha1hex:
+ # verify that body matches CDX hash
+ # TODO: don't need *all* these hashes, just sha1
+ file_meta = gen_file_metadata(resp.content)
+ if cdx_sha1hex != file_meta['sha1hex']:
+ print(" REPLAY MISMATCH: cdx:{} replay:{}".format(
+ cdx_sha1hex,
+ file_meta['sha1hex']),
+ file=sys.stderr)
+ raise WaybackContentError("replay fetch body didn't match CDX hash cdx:{} body:{}".format(
+ cdx_sha1hex,
+ file_meta['sha1hex']),
+ )
+ return resp.content
+
+ def fetch_replay_redirect(self, url, datetime):
+ """
+ Fetches an HTTP 3xx redirect Location from wayback via the replay interface
+ (web.archive.org) instead of petabox.
+
+ Intended for use with SPN2 requests, where request body has not ended
+ up in petabox yet. For example, re-ingesting a base_url which was
+ recently crawler by SPNv2, where we are doing ingest via wayback path.
+
+ Returns None if response is found, but couldn't find redirect.
+ """
+
+ # defensively check datetime format
+ assert len(datetime) == 14
+ assert datetime.isdigit()
+
+ try:
+ resp = requests.get(
+ self.wayback_endpoint + datetime + "id_/" + url,
+ allow_redirects=False,
+ headers=self.replay_headers,
+ )
+ except requests.exceptions.TooManyRedirects:
+ raise WaybackContentError("redirect loop (wayback replay fetch)")
+ except UnicodeDecodeError:
+ raise WaybackContentError("UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(url))
+ try:
+ resp.raise_for_status()
+ except Exception as e:
+ raise WaybackError(str(e))
+ #print(resp.url, file=sys.stderr)
+
+ # defensively check that this is actually correct replay based on headers
+ # previously check for "X-Archive-Redirect-Reason" here
+ if not "X-Archive-Src" in resp.headers:
+ raise WaybackError("redirect replay fetch didn't return X-Archive-Src in headers")
+ if not datetime in resp.url:
+ raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(datetime, resp.url))
+
+ redirect_url = resp.headers.get("Location")
+ # eg, https://web.archive.org/web/20200111003923id_/https://dx.doi.org/10.17504/protocols.io.y2gfybw
+ #print(redirect_url, file=sys.stderr)
+ if redirect_url and redirect_url.startswith("https://web.archive.org/web/"):
+ redirect_url = "/".join(redirect_url.split("/")[5:])
+ #print(redirect_url, file=sys.stderr)
+ if redirect_url and redirect_url.startswith("http"):
+ redirect_url = clean_url(redirect_url)
+ return redirect_url
+ else:
+ return None
+
+ def lookup_resource(self, start_url, best_mimetype=None, closest=None):
+ """
+ Looks in wayback for a resource starting at the URL, following any
+ redirects. Returns a ResourceResult object, which may indicate a
+ failure to fetch the resource.
+
+ Only raises exceptions on remote service failure or unexpected
+ problems.
+
+ In a for loop:
+
+ lookup "best" CDX
+ redirect status code?
+ fetch wayback
+ continue
+ success (200)?
+ fetch wayback
+ return success
+ bad (other status)?
+ return failure
+
+ got to end?
+ return failure; too many redirects
+ """
+ next_url = start_url
+ urls_seen = [start_url]
+ for i in range(self.max_redirects):
+ print(" URL: {}".format(next_url), file=sys.stderr)
+ cdx_row = self.cdx_client.lookup_best(next_url, best_mimetype=best_mimetype, closest=closest)
+ #print(cdx_row, file=sys.stderr)
+ if not cdx_row:
+ return ResourceResult(
+ start_url=start_url,
+ hit=False,
+ status="no-capture",
+ terminal_url=next_url,
+ terminal_dt=None,
+ terminal_status_code=None,
+ body=None,
+ cdx=None,
+ revisit_cdx=None,
+ )
+
+ # first try straight-forward redirect situation
+ if cdx_row.mimetype == "warc/revisit" and '/' in cdx_row.warc_path:
+ resource = self.fetch_petabox(
+ csize=cdx_row.warc_csize,
+ offset=cdx_row.warc_offset,
+ warc_path=cdx_row.warc_path,
+ )
+ if resource.revisit_cdx and resource.revisit_cdx.status_code in (200, 226):
+ return ResourceResult(
+ start_url=start_url,
+ hit=True,
+ status="success",
+ terminal_url=cdx_row.url,
+ terminal_dt=cdx_row.datetime,
+ terminal_status_code=resource.revisit_cdx.status_code,
+ body=resource.body,
+ cdx=cdx_row,
+ revisit_cdx=resource.revisit_cdx,
+ )
+ # else, continue processing with revisit record
+
+ if cdx_row.status_code in (200, 226):
+ revisit_cdx = None
+ if '/' in cdx_row.warc_path:
+ resource = self.fetch_petabox(
+ csize=cdx_row.warc_csize,
+ offset=cdx_row.warc_offset,
+ warc_path=cdx_row.warc_path,
+ )
+ body = resource.body
+ revisit_cdx = resource.revisit_cdx
+ else:
+ body = self.fetch_replay_body(
+ url=cdx_row.url,
+ datetime=cdx_row.datetime,
+ )
+ cdx_row = cdx_partial_from_row(cdx_row)
+ return ResourceResult(
+ start_url=start_url,
+ hit=True,
+ status="success",
+ terminal_url=cdx_row.url,
+ terminal_dt=cdx_row.datetime,
+ terminal_status_code=cdx_row.status_code,
+ body=body,
+ cdx=cdx_row,
+ revisit_cdx=revisit_cdx,
+ )
+ elif 300 <= (cdx_row.status_code or 0) < 400:
+ if '/' in cdx_row.warc_path:
+ resource = self.fetch_petabox(
+ csize=cdx_row.warc_csize,
+ offset=cdx_row.warc_offset,
+ warc_path=cdx_row.warc_path,
+ resolve_revisit=False,
+ )
+ assert 300 <= resource.status_code < 400
+ if not resource.location:
+ print(" bad redirect record: {}".format(cdx_row), file=sys.stderr)
+ return ResourceResult(
+ start_url=start_url,
+ hit=False,
+ status="bad-redirect",
+ terminal_url=cdx_row.url,
+ terminal_dt=cdx_row.datetime,
+ terminal_status_code=cdx_row.status_code,
+ body=None,
+ cdx=cdx_row,
+ revisit_cdx=None,
+ )
+ if not "://" in resource.location:
+ next_url = urllib.parse.urljoin(next_url, resource.location)
+ else:
+ next_url = resource.location
+ if next_url:
+ next_url = clean_url(next_url)
+ else:
+ next_url = self.fetch_replay_redirect(
+ url=cdx_row.url,
+ datetime=cdx_row.datetime,
+ )
+ if next_url:
+ next_url = clean_url(next_url)
+ cdx_row = cdx_partial_from_row(cdx_row)
+ if not next_url:
+ print(" bad redirect record: {}".format(cdx_row), file=sys.stderr)
+ return ResourceResult(
+ start_url=start_url,
+ hit=False,
+ status="bad-redirect",
+ terminal_url=cdx_row.url,
+ terminal_dt=cdx_row.datetime,
+ terminal_status_code=cdx_row.status_code,
+ body=None,
+ cdx=cdx_row,
+ revisit_cdx=None,
+ )
+ if next_url in urls_seen:
+ return ResourceResult(
+ start_url=start_url,
+ hit=False,
+ status="redirect-loop",
+ terminal_url=cdx_row.url,
+ terminal_dt=cdx_row.datetime,
+ terminal_status_code=cdx_row.status_code,
+ body=None,
+ cdx=cdx_row,
+ revisit_cdx=None,
+ )
+ urls_seen.append(next_url)
+ continue
+ else:
+ return ResourceResult(
+ start_url=start_url,
+ hit=False,
+ status="terminal-bad-status",
+ terminal_url=cdx_row.url,
+ terminal_dt=cdx_row.datetime,
+ terminal_status_code=cdx_row.status_code,
+ body=None,
+ cdx=cdx_row,
+ revisit_cdx=None,
+ )
+ return ResourceResult(
+ start_url=start_url,
+ hit=False,
+ status="redirects-exceeded",
+ terminal_url=cdx_row.url,
+ terminal_dt=cdx_row.datetime,
+ terminal_status_code=cdx_row.status_code,
+ body=None,
+ cdx=cdx_row,
+ revisit_cdx=None,
+ )
+
+
+class SavePageNowError(Exception):
+ pass
+
+class SavePageNowBackoffError(SandcrawlerBackoffError):
+ pass
+
+SavePageNowResult = namedtuple('SavePageNowResult', [
+ 'success',
+ 'status',
+ 'job_id',
+ 'request_url',
+ 'terminal_url',
+ 'terminal_dt',
+ 'resources',
+])
+
+class SavePageNowClient:
+
+ def __init__(self, v2endpoint="https://web.archive.org/save", **kwargs):
+ self.ia_access_key = kwargs.get('ia_access_key',
+ os.environ.get('IA_ACCESS_KEY'))
+ self.ia_secret_key = kwargs.get('ia_secret_key',
+ os.environ.get('IA_SECRET_KEY'))
+ self.v2endpoint = v2endpoint
+ self.v2_session = requests_retry_session(retries=5, backoff_factor=3)
+ self.v2_session.headers.update({
+ 'User-Agent': 'Mozilla/5.0 sandcrawler.SavePageNowClient',
+ 'Accept': 'application/json',
+ 'Authorization': 'LOW {}:{}'.format(self.ia_access_key, self.ia_secret_key),
+ })
+
+ # 3 minutes total
+ self.poll_count = 60
+ self.poll_seconds = 3.0
+
+ self.spn_cdx_retry_sec = kwargs.get('spn_cdx_retry_sec', 9.0)
+
+ def save_url_now_v2(self, request_url, force_simple_get=0, capture_outlinks=0):
+ """
+ Returns a "SavePageNowResult" (namedtuple) if SPN request was processed
+ at all, or raises an exception if there was an error with SPN itself.
+
+ If SPN2 was unable to fetch the remote content, `success` will be
+ false and status will be indicated.
+
+ SavePageNowResult fields:
+ - success: boolean if SPN
+ - status: "success" or an error message/type
+ - job_id: returned by API
+ - request_url: url we asked to fetch
+ - terminal_url: final primary resource (after any redirects)
+ - terminal_dt: wayback timestamp of final capture
+ - resources: list of all URLs captured
+
+ TODO: parse SPN error codes (status string) and handle better. Eg,
+ non-200 remote statuses, invalid hosts/URLs, timeouts, backoff, etc.
+ """
+ if capture_outlinks:
+ print(" capturing outlinks!", file=sys.stderr)
+ if not (self.ia_access_key and self.ia_secret_key):
+ raise Exception("SPN2 requires authentication (IA_ACCESS_KEY/IA_SECRET_KEY)")
+ if request_url.startswith("ftp://"):
+ return SavePageNowResult(
+ False,
+ "spn2-no-ftp",
+ None,
+ request_url,
+ None,
+ None,
+ None,
+ )
+ resp = self.v2_session.post(
+ self.v2endpoint,
+ data={
+ 'url': request_url,
+ 'capture_all': 1,
+ 'capture_outlinks': capture_outlinks,
+ 'capture_screenshot': 0,
+ 'if_not_archived_within': '1d',
+ 'force_get': force_simple_get,
+ 'skip_first_archive': 1,
+ 'outlinks_availability': 0,
+ 'js_behavior_timeout': 0,
+ },
+ )
+ if resp.status_code == 429:
+ raise SavePageNowBackoffError("status_code: {}, url: {}".format(resp.status_code, request_url))
+ elif resp.status_code != 200:
+ raise SavePageNowError("SPN2 status_code: {}, url: {}".format(resp.status_code, request_url))
+ resp_json = resp.json()
+
+ if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json['message']:
+ raise SavePageNowBackoffError(resp_json['message'])
+ elif not resp_json or 'job_id' not in resp_json:
+ raise SavePageNowError(
+ "Didn't get expected 'job_id' field in SPN2 response: {}".format(resp_json))
+
+ job_id = resp_json['job_id']
+ print(f" SPNv2 running: job_id={job_id} url={request_url}", file=sys.stderr)
+
+ # poll until complete
+ final_json = None
+ for i in range(self.poll_count):
+ resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint, resp_json['job_id']))
+ try:
+ resp.raise_for_status()
+ except:
+ raise SavePageNowError(resp.content)
+ status = resp.json()['status']
+ if status == 'pending':
+ time.sleep(self.poll_seconds)
+ elif status in ('success', 'error'):
+ final_json = resp.json()
+ break
+ else:
+ raise SavePageNowError("Unknown SPN2 status:{} url:{}".format(status, request_url))
+
+ if not final_json:
+ raise SavePageNowError("SPN2 timed out (polling count exceeded)")
+
+ # if there was a recent crawl of same URL, fetch the status of that
+ # crawl to get correct datetime
+ if final_json.get('original_job_id'):
+ print(f" SPN recent capture: {job_id} -> {final_json['original_job_id']}", file=sys.stderr)
+ resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint, final_json['original_job_id']))
+ try:
+ resp.raise_for_status()
+ except:
+ raise SavePageNowError(resp.content)
+ final_json = resp.json()
+
+ #print(final_json, file=sys.stderr)
+
+ if final_json['status'] == "success":
+ if final_json.get('original_url').startswith('/'):
+ print(f" truncateded URL in JSON: {request_url} {json.dumps(final_json)}", file=sys.stderr)
+ return SavePageNowResult(
+ True,
+ "success",
+ job_id,
+ request_url,
+ final_json['original_url'],
+ final_json['timestamp'],
+ final_json['resources'],
+ )
+ else:
+ if final_json['status'] == 'pending':
+ final_json['status'] = 'error:pending'
+ return SavePageNowResult(
+ False,
+ final_json.get('status_ext') or final_json['status'],
+ job_id,
+ request_url,
+ None,
+ None,
+ None,
+ )
+
+ def crawl_resource(self, start_url, wayback_client, force_simple_get=0):
+ """
+ Runs a SPN2 crawl, then fetches body.
+
+ There is a delay between SPN2 crawls and WARC upload to petabox, so we
+ need to fetch the body via wayback replay instead of petabox
+ range-request.
+ """
+
+ # HACK: capture CNKI domains with outlinks (for COVID-19 crawling)
+ if 'gzbd.cnki.net/' in start_url:
+ spn_result = self.save_url_now_v2(start_url, force_simple_get=force_simple_get, capture_outlinks=1)
+ else:
+ spn_result = self.save_url_now_v2(start_url, force_simple_get=force_simple_get)
+
+ if not spn_result.success:
+ status = spn_result.status
+ if status in ("error:invalid-url", "error:not-found",
+ "error:invalid-host-resolution", "error:gateway-timeout",
+ "error:too-many-redirects", "error:read-timeout"):
+ status = status.replace("error:", "")
+ elif status in ("error:no-access", "error:forbidden"):
+ status = "forbidden"
+ elif status == "error:user-session-limit":
+ raise SavePageNowBackoffError("SPNv2 user-session-limit")
+ elif status == "error:internal-server-error":
+ status = "remote-server-error"
+ elif status.startswith("error:"):
+ status = "spn2-" + status
+ # despite other errors, call these a failure (so we don't retry)
+ if spn_result.terminal_url and (spn_result.terminal_url.endswith('/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1")):
+ status = "blocked-cookie"
+ return ResourceResult(
+ start_url=start_url,
+ hit=False,
+ status=status,
+ terminal_url=spn_result.terminal_url,
+ terminal_dt=spn_result.terminal_dt,
+ terminal_status_code=None,
+ body=None,
+ cdx=None,
+ revisit_cdx=None,
+ )
+ #print(spn_result, file=sys.stderr)
+
+ # detect partial URL response (aka, success, but missing full URL)
+ if not "://" in spn_result.terminal_url or spn_result.terminal_url.startswith('/'):
+ return ResourceResult(
+ start_url=start_url,
+ hit=False,
+ status="spn2-success-partial-url",
+ terminal_url=spn_result.terminal_url,
+ terminal_dt=spn_result.terminal_dt,
+ terminal_status_code=None,
+ body=None,
+ cdx=None,
+ revisit_cdx=None,
+ )
+
+ # don't try to CDX fetch for this common cookie block terminal
+ if spn_result.terminal_url.endswith('/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1"):
+ return ResourceResult(
+ start_url=start_url,
+ hit=False,
+ status="blocked-cookie",
+ terminal_url=spn_result.terminal_url,
+ terminal_dt=spn_result.terminal_dt,
+ terminal_status_code=None,
+ body=None,
+ cdx=None,
+ revisit_cdx=None,
+ )
+
+ cdx_row = None
+ # hack to work around elsevier weirdness
+ if "://pdf.sciencedirectassets.com/" in spn_result.request_url:
+ elsevier_pdf_cdx = wayback_client.cdx_client.lookup_best(
+ spn_result.request_url,
+ best_mimetype="application/pdf",
+ )
+ if elsevier_pdf_cdx and elsevier_pdf_cdx.mimetype == "application/pdf":
+ print(" Trying pdf.sciencedirectassets.com hack!", file=sys.stderr)
+ cdx_row = elsevier_pdf_cdx
+ else:
+ print(" Failed pdf.sciencedirectassets.com hack!", file=sys.stderr)
+ #print(elsevier_pdf_cdx, file=sys.stderr)
+
+ if not cdx_row:
+ # lookup exact
+ try:
+ filter_status_code = None
+ if spn_result.terminal_url.startswith("ftp://"):
+ filter_status_code = 226
+ cdx_row = wayback_client.cdx_client.fetch(
+ url=spn_result.terminal_url,
+ datetime=spn_result.terminal_dt,
+ filter_status_code=filter_status_code,
+ retry_sleep=self.spn_cdx_retry_sec,
+ )
+ # sometimes there are fuzzy http/https self-redirects with the
+ # same SURT; try to work around that
+ if cdx_row.status_code >= 300 and cdx_row.status_code < 400:
+ cdx_row = wayback_client.cdx_client.fetch(
+ url=spn_result.terminal_url,
+ datetime=spn_result.terminal_dt,
+ filter_status_code=200,
+ retry_sleep=self.spn_cdx_retry_sec,
+ )
+ except KeyError as ke:
+ print(" CDX KeyError: {}".format(ke), file=sys.stderr)
+ return ResourceResult(
+ start_url=start_url,
+ hit=False,
+ status="spn2-cdx-lookup-failure",
+ terminal_url=spn_result.terminal_url,
+ terminal_dt=spn_result.terminal_dt,
+ terminal_status_code=None,
+ body=None,
+ cdx=None,
+ revisit_cdx=None,
+ )
+
+ #print(cdx_row, file=sys.stderr)
+
+ revisit_cdx = None
+ if '/' in cdx_row.warc_path:
+ # Usually can't do this kind of direct fetch because CDX result is recent/live
+ resource = wayback_client.fetch_petabox(
+ csize=cdx_row.warc_csize,
+ offset=cdx_row.warc_offset,
+ warc_path=cdx_row.warc_path,
+ )
+ body = resource.body
+ if resource.revisit_cdx:
+ assert resource.revisit_cdx.sha1hex == cdx_row.sha1hex
+ revisit_cdx = resource.revisit_cdx
+ else:
+ # note: currently not trying to verify cdx_row.sha1hex
+ try:
+ body = wayback_client.fetch_replay_body(
+ url=cdx_row.url,
+ datetime=cdx_row.datetime,
+ )
+ except (WaybackError, WaybackContentError) as we:
+ return ResourceResult(
+ start_url=start_url,
+ hit=False,
+ status="spn2-wayback-error",
+ terminal_url=cdx_row.url,
+ terminal_dt=cdx_row.datetime,
+ terminal_status_code=None,
+ body=None,
+ cdx=None,
+ revisit_cdx=None,
+ )
+ # warc_path etc will change, so strip them out
+ cdx_row = cdx_partial_from_row(cdx_row)
+
+ assert cdx_row.status_code
+ if cdx_row.status_code in (200, 226):
+ return ResourceResult(
+ start_url=start_url,
+ hit=True,
+ status="success",
+ terminal_url=cdx_row.url,
+ terminal_dt=cdx_row.datetime,
+ terminal_status_code=cdx_row.status_code,
+ body=body,
+ cdx=cdx_row,
+ revisit_cdx=revisit_cdx,
+ )
+ else:
+ return ResourceResult(
+ start_url=start_url,
+ hit=False,
+ status="terminal-bad-status",
+ terminal_url=cdx_row.url,
+ terminal_dt=cdx_row.datetime,
+ terminal_status_code=cdx_row.status_code,
+ body=body,
+ cdx=cdx_row,
+ revisit_cdx=revisit_cdx,
+ )
+
+
+def fix_transfer_encoding(file_meta: dict, resource: ResourceResult) -> Tuple[dict, ResourceResult]:
+ if resource.body and file_meta['mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip':
+ print(" transfer encoding not stripped: {}".format(resource.cdx.mimetype), file=sys.stderr)
+ inner_body = gzip.decompress(resource.body)
+ if not inner_body:
+ raise Exception("null body inside transfer encoding")
+ inner_resource = ResourceResult(
+ body=inner_body,
+ # copy all other fields
+ start_url=resource.start_url,
+ hit=resource.hit,
+ status=resource.status,
+ terminal_url=resource.terminal_url,
+ terminal_dt=resource.terminal_dt,
+ terminal_status_code=resource.terminal_status_code,
+ cdx=resource.cdx,
+ revisit_cdx=resource.revisit_cdx,
+ )
+ inner_file_meta = gen_file_metadata(inner_resource.body)
+ return (inner_file_meta, inner_resource)
+ else:
+ return (file_meta, resource)
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py
new file mode 100644
index 0000000..b852c69
--- /dev/null
+++ b/python/sandcrawler/ingest.py
@@ -0,0 +1,833 @@
+
+import sys
+import json
+import gzip
+import time
+import base64
+import xml.etree.ElementTree
+from collections import namedtuple
+from typing import Optional, Tuple, Any, Dict, List
+from http.server import BaseHTTPRequestHandler, HTTPServer
+
+import requests
+from selectolax.parser import HTMLParser
+
+from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult, fix_transfer_encoding, NoCaptureError
+from sandcrawler.grobid import GrobidClient
+from sandcrawler.pdfextract import process_pdf, PdfExtractResult
+from sandcrawler.misc import gen_file_metadata, clean_url, parse_cdx_datetime
+from sandcrawler.html import extract_fulltext_url
+from sandcrawler.html_ingest import fetch_html_resources, \
+ quick_fetch_html_resources, html_guess_scope, html_extract_body_teixml, \
+ WebResource, html_guess_platform
+from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, html_extract_biblio, load_adblock_rules
+from sandcrawler.workers import SandcrawlerWorker
+from sandcrawler.db import SandcrawlerPostgrestClient
+from sandcrawler.xml import xml_reserialize
+
+
+MAX_BODY_SIZE_BYTES = 128*1024*1024
+
+class IngestFileWorker(SandcrawlerWorker):
+ """
+ High level flow is to look in history first, then go to live web if
+ resource not found. Following redirects is treated as "fetching a
+ resource". Current version fetches a single resource; if it isn't a hit
+ but is an HTML 200, treats it as a landing page, tries to extract
+ fulltext link, then fetches that resource.
+
+ process(request, key=None) -> response
+ Does all the things!
+
+ Check existing processing (short circuit):
+
+ check_existing_ingest(base_url) -> ingest_file_result or none
+ process_existing(result) -> response
+ try fetching all the rows we want. if any don't exist, fetch the resource itself and call process_hit()
+
+ Fetch resource:
+
+ find_resource(url) -> ResourceResult
+
+ Process resource:
+
+ process_hit(ResourceResult) -> response
+ process_grobid(ResourceResult)
+ """
+
+ def __init__(self, sink=None, **kwargs):
+ super().__init__()
+
+ self.sink = sink
+ self.wayback_client = kwargs.get('wayback_client')
+ if not self.wayback_client:
+ self.wayback_client = WaybackClient()
+ self.spn_client = kwargs.get('spn_client')
+ if not self.spn_client:
+ self.spn_client = SavePageNowClient(spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0))
+ self.grobid_client = kwargs.get('grobid_client')
+ if not self.grobid_client:
+ self.grobid_client = GrobidClient()
+ self.pgrest_client = kwargs.get('pgrest_client')
+ if not self.pgrest_client:
+ self.pgrest_client = SandcrawlerPostgrestClient()
+ self.grobid_sink = kwargs.get('grobid_sink')
+ self.thumbnail_sink = kwargs.get('thumbnail_sink')
+ self.pdftext_sink = kwargs.get('pdftext_sink')
+ self.xmldoc_sink = kwargs.get('xmldoc_sink')
+ self.htmlteixml_sink = kwargs.get('htmlteixml_sink')
+ self.max_hops = 6
+
+ self.try_existing_ingest = kwargs.get('try_existing_ingest', False)
+ self.try_existing_grobid = kwargs.get('try_existing_grobid', True)
+ self.try_existing_pdfextract = kwargs.get('try_existing_pdfextract', True)
+ self.try_wayback = kwargs.get('try_wayback', True)
+ self.try_spn2 = kwargs.get('try_spn2', True)
+ self.html_quick_mode = kwargs.get('html_quick_mode', False)
+ self.adblock_rules = load_adblock_rules()
+ self.max_html_resources = 200
+
+ self.base_url_blocklist = [
+ # robot blocking
+ "://hkvalidate.perfdrive.com/",
+
+ # temporary, until we implement specific fetch and 'petabox' output
+ "://archive.org/",
+ "://www.archive.org/",
+ "://web.archive.org/web/",
+
+ # out of scope
+ "://openlibrary.org/",
+ "://www.openlibrary.org/",
+ "://fatcat.wiki/",
+ "://orcid.org/",
+ "://doaj.org/",
+
+ # Domain squats
+ "://bartandjones.com",
+ "://ijretm.com",
+ "://ijrcemas.com",
+ "://jist.net.in",
+ "://croisements-revue.org",
+
+ # all stubs/previews, not full papers
+ "://page-one.live.cf.public.springer.com",
+
+ # large datasets-only (no PDF expected)
+ "plutof.ut.ee/",
+ "www.gbif.org/",
+ "doi.pangaea.de/",
+ "www.plate-archive.org/",
+ "://doi.org/10.25642/ipk/gbis/",
+ "://apex.ipk-gatersleben.de/",
+ "fao.org/glis/",
+
+ # Historical non-paper content:
+ "dhz.uni-passau.de/", # newspapers
+ "digital.ucd.ie/", # ireland national historical
+
+ # DOI prefixes
+ "doi.org/10.2307/", # JSTOR; slow and many redirects
+ "doi.org/10.18730/", # fao.org: database entry
+ "doi.org/10.15468/", # gbif.org: database entry
+
+ # deprecated domain (doesn't redirect correctly)
+ "://edoc.mpg.de/",
+ ]
+
+ self.wall_blocklist = [
+ # loginwall
+ "://profile.thieme.de/HTML/sso/ejournals/login.htm",
+ "://login.bepress.com/",
+ "?SAMLRequest=",
+ "://osapublishing.org/captcha/",
+ "/password-login",
+ "://gateway.isiknowledge.com/",
+ "/login?TARGET=",
+ ]
+
+ self.cookie_blocklist = [
+ "/cookieAbsent",
+ "cookieSet=1",
+ "error=cookies_not_supported",
+ ]
+
+ # these are special-case web domains for which we want SPN2 to not run
+ # a headless browser (brozzler), but instead simply run wget.
+ # the motivation could be to work around browser issues, or in the
+ # future possibly to increase download efficiency (wget/fetch being
+ # faster than browser fetch)
+ self.spn2_simple_get_domains = [
+ # direct PDF links
+ "://arxiv.org/pdf/",
+ "://europepmc.org/backend/ptpmcrender.fcgi",
+ "://pdfs.semanticscholar.org/",
+ "://res.mdpi.com/",
+
+ # platform sites
+ "://zenodo.org/",
+ "://figshare.org/",
+ "://springernature.figshare.com/",
+
+ # popular simple cloud storage or direct links
+ "://s3-eu-west-1.amazonaws.com/",
+ ]
+
+ self.src_valid_mimetypes = [
+ "text/x-tex",
+ "application/gzip",
+ "application/x-bzip",
+ "application/x-bzip2",
+ "application/zip",
+ "application/x-tar",
+ "application/msword",
+ "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
+ ]
+
+ self.component_valid_mimetypes = [
+ "image/jpeg",
+ "image/tiff",
+ "image/png",
+ "image/gif",
+ "audio/mpeg",
+ "video/mp4",
+ "video/mpeg",
+ "text/plain",
+ "text/csv",
+ "application/json",
+ "application/xml",
+ "application/pdf",
+ "application/gzip",
+ "application/x-bzip",
+ "application/x-bzip2",
+ "application/zip ",
+ "application/x-rar ",
+ "application/x-7z-compressed",
+ "application/x-tar",
+ "application/vnd.ms-powerpoint",
+ "application/vnd.ms-excel",
+ "application/msword",
+ "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
+ "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
+ ]
+
+
+ def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]:
+ """
+ Check in sandcrawler-db (postgres) to see if we have already ingested
+ this URL (ingest file result table).
+
+ Returns existing row *if* found *and* we should use it, otherwise None.
+
+ Looks at existing ingest results and makes a decision based on, eg,
+ status and timestamp.
+ """
+ if not self.try_existing_ingest:
+ return None
+ existing = self.pgrest_client.get_ingest_file_result(ingest_type, base_url)
+ # TODO: filter on more flags?
+ if existing and existing['hit'] == True:
+ return existing
+ else:
+ return None
+
+ def find_resource(self, url, best_mimetype=None, force_recrawl=False) -> Optional[ResourceResult]:
+ """
+ Looks in wayback for a resource starting at the URL, following any
+ redirects. If a hit isn't found, try crawling with SPN.
+ """
+ via = "none"
+ resource = None
+
+ if url.startswith("http://web.archive.org/web/") or url.startswith("https://web.archive.org/web/"):
+ raise NotImplementedError("handling direct wayback links not supported yet")
+
+ if url.startswith("http://archive.org/") or url.startswith("https://archive.org/"):
+ raise NotImplementedError("fetching from archive.org not implemented yet")
+
+ if self.try_wayback and not force_recrawl:
+ via = "wayback"
+ resource = self.wayback_client.lookup_resource(url, best_mimetype)
+
+ # check for "soft 404" conditions, where we should retry with live SPNv2
+ soft404 = False
+ # NOTE: these are often not working with SPNv2 either, so disabling. If
+ # we really want to try again, should do force-recrawl
+ #if resource and resource.hit and resource.terminal_url.endswith('/cookieAbsent'):
+ # soft404 = True
+
+ old_failure = False
+ if resource and not resource.hit and resource.terminal_dt and resource.terminal_dt < '20190000000000':
+ old_failure = True
+
+ if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture') or soft404 or old_failure):
+ via = "spn2"
+ force_simple_get = 0
+ for domain in self.spn2_simple_get_domains:
+ if domain in url:
+ force_simple_get = 1
+ break
+ resource = self.spn_client.crawl_resource(url, self.wayback_client, force_simple_get=force_simple_get)
+ print("[FETCH {:>6}] {} {}".format(
+ via,
+ (resource and resource.status),
+ (resource and resource.terminal_url) or url),
+ file=sys.stderr)
+ return resource
+
+ def process_existing(self, request: dict, result_row: dict) -> dict:
+ """
+ If we have an existing ingest file result, do any database fetches or
+ additional processing necessary to return a result.
+ """
+ raise NotImplementedError("process_existing() not tested or safe yet")
+ assert result_row['hit']
+ existing_file_meta = self.pgrest_client.get_file_meta(result_row['terminal_sha1hex'])
+ existing_grobid = self.pgrest_client.get_grobid(result_row['terminal_sha1hex'])
+ existing_cdx = self.pgrest_client.get_cdx(result_row['terminal_url'], result_row['terminal_dt'])
+ if not (existing_file_meta and existing_grobid and existing_cdx):
+ raise NotImplementedError("partially-exsiting records not implemented yet")
+ result = {
+ 'hit': result_row['hit'],
+ 'status': "existing",
+ 'request': request,
+ 'grobid': existing_grobid,
+ 'file_meta': existing_file_meta,
+ 'cdx': existing_cdx,
+ 'terminal': {
+ 'terminal_url': result_row['terminal_url'],
+ 'terminal_dt': result_row['terminal_dt'],
+ 'terminal_status_code': result_row['terminal_status_code'],
+ 'terminal_sha1hex': result_row['terminal_sha1hex'],
+ },
+ }
+ return result
+
+ def process_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict:
+ """
+ Run all the necessary processing for a new/fresh ingest hit.
+ """
+ if ingest_type == "pdf":
+ return {
+ 'grobid': self.process_grobid(resource, file_meta),
+ 'pdf_meta': self.process_pdfextract(resource, file_meta),
+ }
+ elif ingest_type == "xml":
+ return {
+ 'xml_meta': self.process_xml(resource, file_meta),
+ }
+ elif ingest_type == "html":
+ html_info = self.process_html(resource, file_meta)
+ # if there is no html_biblio, don't clobber anything possibly extracted earlier
+ if 'html_biblio' in html_info and not html_info['html_biblio']:
+ html_info.pop('html_biblio')
+ return html_info
+ elif ingest_type == "src":
+ return {}
+ elif ingest_type == "component":
+ return {}
+ else:
+ raise NotImplementedError(f"process {ingest_type} hit")
+
+ def process_grobid(self, resource: ResourceResult, file_meta: dict) -> dict:
+ """
+ Submits to resource body to GROBID for processing.
+
+ TODO: By default checks sandcrawler-db for an existing row first, then
+ decide if we should re-process
+ """
+ if self.try_existing_grobid:
+ existing = self.pgrest_client.get_grobid(file_meta['sha1hex'])
+ if existing:
+ print("found existing GROBID result", file=sys.stderr)
+ return existing
+
+ # Need to actually processes
+ result = self.grobid_client.process_fulltext(resource.body)
+ if self.grobid_sink:
+ # extra fields for GROBID kafka messages
+ result['file_meta'] = file_meta
+ result['key'] = result['file_meta']['sha1hex']
+ self.grobid_sink.push_record(result.copy())
+ if result['status'] == "success":
+ metadata = self.grobid_client.metadata(result)
+ if metadata:
+ result['metadata'] = self.grobid_client.metadata(result)
+ result['fatcat_release'] = result['metadata'].pop('fatcat_release', None)
+ result['grobid_version'] = result['metadata'].pop('grobid_version', None)
+ result.pop('tei_xml', None)
+ result.pop('file_meta', None)
+ result.pop('key', None)
+ return result
+
+ def process_pdfextract(self, resource: ResourceResult, file_meta: dict) -> dict:
+ """
+ Extracts thumbnail and pdf_meta info from PDF.
+
+ By default checks sandcrawler-db for an existing row first, then decide
+ if we should re-process.
+
+ TODO: difference between Kafka schema and SQL/postgrest schema
+ """
+ if self.try_existing_pdfextract:
+ existing = self.pgrest_client.get_pdf_meta(file_meta['sha1hex'])
+ if existing:
+ print("found existing pdf_meta result", file=sys.stderr)
+ result = PdfExtractResult.from_pdf_meta_dict(existing)
+ return result.to_pdftext_dict()
+
+ # Need to actually processes
+ result = process_pdf(resource.body)
+ assert result.file_meta['sha1hex'] == file_meta['sha1hex']
+ if self.thumbnail_sink and result.page0_thumbnail is not None:
+ self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex)
+ if self.pdftext_sink:
+ self.pdftext_sink.push_record(result.to_pdftext_dict(), key=result.sha1hex)
+ result.page0_thumbnail = None
+ result.text = None
+ result.file_meta = None
+ return result.to_pdftext_dict()
+
+ def process_xml(self, resource: ResourceResult, file_meta: dict) -> dict:
+ """
+ Simply publishes to Kafka topic.
+
+ In the future, could extract other metadata here (like body word
+ count), or attempting to fetch sub-resources.
+ """
+ if self.xmldoc_sink and file_meta['mimetype'] == "application/jats+xml":
+ try:
+ jats_xml = xml_reserialize(resource.body)
+ except xml.etree.ElementTree.ParseError:
+ return dict(status="xml-parse-error")
+ msg = dict(
+ sha1hex=file_meta["sha1hex"],
+ status="success",
+ jats_xml=jats_xml,
+ )
+ self.xmldoc_sink.push_record(msg, key=file_meta['sha1hex'])
+ return dict(status="success")
+
+ def process_html(self, resource: ResourceResult, file_meta: dict) -> dict:
+
+ assert resource.body
+ try:
+ html_doc = HTMLParser(resource.body)
+ except ValueError as ve:
+ return dict(
+ status="html-selectolax-error",
+ )
+ html_biblio = html_extract_biblio(resource.terminal_url, html_doc)
+ assert html_biblio
+ html_body = html_extract_body_teixml(resource.body)
+ html_platform = html_guess_platform(resource.terminal_url, html_doc, html_biblio)
+ html_scope = html_guess_scope(resource.terminal_url, html_doc, html_biblio, html_body.get('word_count'))
+ html_biblio_dict = json.loads(html_biblio.json(exclude_none=True))
+
+ if html_scope in ('blocked-captcha','blocked-cookie','blocked-forbidden'):
+ return dict(
+ status=html_scope,
+ html_biblio=html_biblio_dict,
+ scope=html_scope,
+ platform=html_platform,
+ )
+ elif html_scope not in ('article-fulltext','unknown',):
+ html_body.pop("tei_xml", None)
+ return dict(
+ status="wrong-scope",
+ html_biblio=html_biblio_dict,
+ scope=html_scope,
+ platform=html_platform,
+ html_body=html_body,
+ )
+
+ raw_resources = html_extract_resources(resource.terminal_url, html_doc, self.adblock_rules)
+ if len(raw_resources) > self.max_html_resources:
+ html_body.pop("tei_xml", None)
+ return dict(
+ status="too-many-resources",
+ html_biblio=html_biblio_dict,
+ scope=html_scope,
+ platform=html_platform,
+ html_body=html_body,
+ )
+
+ if self.htmlteixml_sink and html_body['status'] == "success":
+ self.htmlteixml_sink.push_record(html_body, key=file_meta['sha1hex'])
+
+ html_body.pop("tei_xml", None)
+
+ partial_result = dict(
+ html_biblio=html_biblio_dict,
+ scope=html_scope,
+ platform=html_platform,
+ html_body=html_body,
+ )
+
+ when = parse_cdx_datetime(resource.cdx.datetime)
+ full_resources: List[WebResource] = []
+
+ try:
+ if self.html_quick_mode:
+ print(" WARN: running quick CDX-only fetches", file=sys.stderr)
+ full_resources = quick_fetch_html_resources(raw_resources, self.wayback_client.cdx_client, when)
+ else:
+ full_resources = fetch_html_resources(raw_resources, self.wayback_client, when)
+ except PetaboxError as e:
+ partial_result['status'] = 'petabox-error'
+ partial_result['error_message'] = str(e)[:1600]
+ return partial_result
+ except CdxApiError as e:
+ partial_result['status'] = 'cdx-error'
+ partial_result['error_message'] = str(e)[:1600]
+ return partial_result
+ except WaybackError as e:
+ partial_result['status'] = 'wayback-error'
+ partial_result['error_message'] = str(e)[:1600]
+ return partial_result
+ except WaybackContentError as e:
+ partial_result['status'] = 'wayback-content-error'
+ partial_result['error_message'] = str(e)[:1600]
+ return partial_result
+ except NoCaptureError as e:
+ partial_result['status'] = 'html-resource-no-capture'
+ partial_result['error_message'] = str(e)[:1600]
+ return partial_result
+
+ info = dict(
+ html_body=html_body,
+ html_biblio=html_biblio_dict,
+ scope=html_scope,
+ platform=html_platform,
+ html_resources=[json.loads(r.json(exclude_none=True)) for r in full_resources],
+ )
+ if html_scope == 'unknown':
+ info['status'] = 'unknown-scope'
+ return info
+
+ def timeout_response(self, task: dict) -> dict:
+ print("[TIMEOUT]", file=sys.stderr)
+ return dict(
+ request=task,
+ hit=False,
+ status="timeout",
+ error_message="ingest worker internal timeout",
+ )
+
+ def want(self, request: dict) -> bool:
+ if not request.get('ingest_type') in ('file', 'pdf', 'xml', 'html', 'src', 'component'):
+ return False
+ return True
+
+ def process(self, request: dict, key: Any = None) -> dict:
+
+ # old backwards compatibility
+ if request.get('ingest_type') == 'file':
+ request['ingest_type'] = 'pdf'
+
+ ingest_type = request.get('ingest_type')
+ if ingest_type not in ("pdf", "xml", "html", "src", "component"):
+ raise NotImplementedError(f"can't handle ingest_type={ingest_type}")
+
+ # parse/clean URL
+ # note that we pass through the original/raw URL, and that is what gets
+ # persisted in database table
+ base_url = clean_url(request['base_url'])
+
+ force_recrawl = bool(request.get('force_recrawl', False))
+
+ for block in self.base_url_blocklist:
+ if block in base_url:
+ print("[SKIP {:>6}] {}".format(ingest_type, base_url), file=sys.stderr)
+ return dict(request=request, hit=False, status="skip-url-blocklist")
+
+ print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr)
+
+ best_mimetype = None
+ if ingest_type == "pdf":
+ best_mimetype = "application/pdf"
+ elif ingest_type == "xml":
+ best_mimetype = "text/xml"
+ elif ingest_type == "html":
+ best_mimetype = "text/html"
+ elif ingest_type == "src":
+ best_mimetype = "application/gzip"
+
+ existing = self.check_existing_ingest(ingest_type, base_url)
+ if existing:
+ return self.process_existing(request, existing)
+
+ result: Dict[str, Any] = dict(request=request, hit=False)
+
+ next_url = base_url
+ hops = [base_url]
+
+ while len(hops) <= self.max_hops:
+
+ result['hops'] = hops
+
+ # check against blocklist again on each hop
+ for block in self.base_url_blocklist:
+ if block in next_url:
+ result['status'] = "skip-url-blocklist"
+ return result
+
+ # check against known loginwall URLs
+ for block in self.wall_blocklist:
+ if block in next_url:
+ # TODO: blocked-wall instead of skip-wall
+ result['status'] = "skip-wall"
+ return result
+
+ # check for popular cookie blocking URL patterns. On successful SPN
+ # crawls, shouldn't see these redirect URLs
+ for pattern in self.cookie_blocklist:
+ if pattern in next_url:
+ result['status'] = 'blocked-cookie'
+ return result
+
+ try:
+ resource = self.find_resource(next_url, best_mimetype, force_recrawl=force_recrawl)
+ except SavePageNowError as e:
+ result['status'] = 'spn2-error'
+ result['error_message'] = str(e)[:1600]
+ return result
+ except PetaboxError as e:
+ result['status'] = 'petabox-error'
+ result['error_message'] = str(e)[:1600]
+ return result
+ except CdxApiError as e:
+ result['status'] = 'cdx-error'
+ result['error_message'] = str(e)[:1600]
+ # add a sleep in cdx-error path as a slow-down
+ time.sleep(2.0)
+ return result
+ except WaybackError as e:
+ result['status'] = 'wayback-error'
+ result['error_message'] = str(e)[:1600]
+ return result
+ except WaybackContentError as e:
+ result['status'] = 'wayback-content-error'
+ result['error_message'] = str(e)[:1600]
+ return result
+ except NotImplementedError as e:
+ result['status'] = 'not-implemented'
+ result['error_message'] = str(e)[:1600]
+ return result
+
+ assert resource
+
+ if resource.terminal_url:
+ result['terminal'] = {
+ "terminal_url": resource.terminal_url,
+ "terminal_dt": resource.terminal_dt,
+ "terminal_status_code": resource.terminal_status_code,
+ }
+ if resource.terminal_url not in result['hops']:
+ result['hops'].append(resource.terminal_url)
+
+ if not resource.hit:
+ result['status'] = resource.status
+ return result
+
+ if resource.terminal_url:
+ for pattern in self.base_url_blocklist:
+ if pattern in resource.terminal_url:
+ result['status'] = 'skip-url-blocklist'
+ return result
+
+ if resource.terminal_url:
+ for pattern in self.cookie_blocklist:
+ if pattern in resource.terminal_url:
+ result['status'] = 'blocked-cookie'
+ return result
+
+ if not resource.body:
+ result['status'] = 'null-body'
+ return result
+
+ if len(resource.body) > MAX_BODY_SIZE_BYTES:
+ result['status'] = 'body-too-large'
+ return result
+
+ file_meta = gen_file_metadata(resource.body)
+ try:
+ file_meta, resource = fix_transfer_encoding(file_meta, resource)
+ except Exception as e:
+ result['status'] = 'bad-gzip-encoding'
+ result['error_message'] = str(e)
+ return result
+
+ if not resource.body or file_meta['size_bytes'] == 0:
+ result['status'] = 'null-body'
+ return result
+
+ # here we split based on ingest type to try and extract a next hop
+ html_ish_resource = bool(
+ "html" in file_meta['mimetype']
+ or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml"
+ or "application/xml" in file_meta['mimetype']
+ or "text/xml" in file_meta['mimetype']
+ )
+ html_biblio = None
+ html_doc = None
+ if html_ish_resource and resource.body:
+ try:
+ html_doc = HTMLParser(resource.body)
+ html_biblio = html_extract_biblio(resource.terminal_url, html_doc)
+ if html_biblio:
+ if not 'html_biblio' in result or html_biblio.title:
+ result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True))
+ #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr)
+ except ValueError:
+ pass
+
+ if ingest_type == "pdf" and html_ish_resource:
+
+ # the new style of URL extraction (already computed)
+ if html_biblio and html_biblio.pdf_fulltext_url:
+ fulltext_url = dict(
+ pdf_url=html_biblio.pdf_fulltext_url,
+ technique="html_biblio",
+ )
+ else:
+ fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body)
+
+ result['extract_next_hop'] = fulltext_url
+ if not fulltext_url:
+ result['status'] = 'no-pdf-link'
+ return result
+ next_url = fulltext_url.get('pdf_url') or fulltext_url.get('next_url') or ""
+ assert next_url
+ next_url = clean_url(next_url)
+ print("[PARSE {:>6}] {} {}".format(
+ ingest_type,
+ fulltext_url.get('technique'),
+ next_url,
+ ),
+ file=sys.stderr)
+ if next_url in hops:
+ result['status'] = 'link-loop'
+ result['error_message'] = "repeated: {}".format(next_url)
+ return result
+ hops.append(next_url)
+ continue
+ elif ingest_type in ("xml", "html", "component") and html_ish_resource and html_biblio:
+ # NOTE: src_fulltext_url is not a thing
+ next_url_found = None
+ if ingest_type == "xml" and html_biblio.xml_fulltext_url:
+ next_url_found = html_biblio.xml_fulltext_url
+ elif ingest_type == "html" and html_biblio.html_fulltext_url:
+ next_url_found = html_biblio.html_fulltext_url
+ elif ingest_type == "component" and html_biblio.component_url:
+ next_url_found = html_biblio.component_url
+
+ if next_url_found:
+ next_url = next_url_found
+ technique = "html_biblio"
+ print("[PARSE {:>6}] {} {}".format(
+ ingest_type,
+ technique,
+ next_url,
+ ),
+ file=sys.stderr)
+ if next_url in hops:
+ if ingest_type == "html":
+ # for HTML ingest, we don't count this as a link-loop
+ break
+ result['status'] = 'link-loop'
+ result['error_message'] = "repeated: {}".format(next_url)
+ return result
+ hops.append(next_url)
+ continue
+
+ # default is to NOT keep hopping
+ break
+
+ if len(hops) >= self.max_hops:
+ result['status'] = "max-hops-exceeded"
+ return result
+
+ # fetch must be a hit if we got this far (though not necessarily an ingest hit!)
+ assert resource
+ assert resource.hit == True
+ assert resource.terminal_status_code in (200, 226)
+
+ if resource.terminal_url:
+ result['terminal'] = {
+ "terminal_url": resource.terminal_url,
+ "terminal_dt": resource.terminal_dt,
+ "terminal_status_code": resource.terminal_status_code,
+ "terminal_sha1hex": file_meta['sha1hex'],
+ }
+
+ result['file_meta'] = file_meta
+ result['cdx'] = cdx_to_dict(resource.cdx)
+ if resource.revisit_cdx:
+ result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx)
+
+ if ingest_type == "pdf":
+ if file_meta['mimetype'] != "application/pdf":
+ result['status'] = "wrong-mimetype" # formerly: "other-mimetype"
+ return result
+ elif ingest_type == "xml":
+ if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"):
+ result['status'] = "wrong-mimetype"
+ return result
+ elif ingest_type == "html":
+ if file_meta['mimetype'] not in ("text/html", "application/xhtml+xml"):
+ result['status'] = "wrong-mimetype"
+ return result
+ elif ingest_type == "src":
+ if file_meta['mimetype'] not in self.src_valid_mimetypes:
+ result['status'] = "wrong-mimetype"
+ return result
+ elif ingest_type == "component":
+ if file_meta['mimetype'] not in self.component_valid_mimetypes:
+ result['status'] = "wrong-mimetype"
+ return result
+ else:
+ raise NotImplementedError()
+
+ info = self.process_hit(ingest_type, resource, file_meta)
+ result.update(info)
+
+ # check if processing turned up an error
+ if info.get('status') not in ('success', None):
+ result['status'] = info['status']
+ return result
+
+ result['status'] = "success"
+ result['hit'] = True
+ if ingest_type == "pdf":
+ print("[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format(
+ ingest_type,
+ result.get('file_meta', {}).get('sha1hex'),
+ result.get('grobid', {}).get('status_code'),
+ result.get('pdf_meta', {}).get('status'),
+ ),
+ file=sys.stderr)
+ else:
+ print("[SUCCESS {:>5}] sha1:{}".format(
+ ingest_type,
+ result.get('file_meta', {}).get('sha1hex'),
+ ),
+ file=sys.stderr)
+ return result
+
+
+class IngestFileRequestHandler(BaseHTTPRequestHandler):
+ def do_POST(self):
+ if self.path != "/ingest":
+ self.send_response(404)
+ self.end_headers()
+ self.wfile.write("404: Not Found")
+ return
+ length = int(self.headers.get('content-length'))
+ request = json.loads(self.rfile.read(length).decode('utf-8'))
+ print("Got request: {}".format(request))
+ ingester = IngestFileWorker()
+ result = ingester.process(request)
+ self.send_response(200)
+ self.end_headers()
+ self.wfile.write(json.dumps(result))
diff --git a/python/sandcrawler/minio.py b/python/sandcrawler/minio.py
new file mode 100644
index 0000000..c7deea1
--- /dev/null
+++ b/python/sandcrawler/minio.py
@@ -0,0 +1,99 @@
+
+import io
+import os
+import hashlib
+
+import minio
+
+
+class SandcrawlerMinioClient(object):
+
+ def __init__(self, host_url, access_key, secret_key, default_bucket=None):
+ """
+ host is minio connection string (host:port)
+ access and secret key are as expected
+ default_bucket can be supplied so that it doesn't need to be repeated for each function call
+
+ Example config:
+
+ host="localhost:9000",
+ access_key=os.environ['SANDCRAWLER_BLOB_ACCESS_KEY'],
+ secret_key=os.environ['SANDCRAWLER_BLOB_ACCESS_KEY'],
+ """
+ self.mc = minio.Minio(
+ host_url,
+ access_key=access_key,
+ secret_key=secret_key,
+ secure=False,
+ )
+ self.default_bucket = default_bucket
+
+ def _blob_path(self, folder, sha1hex: str, extension: str, prefix):
+ if not extension:
+ extension = ""
+ if not prefix:
+ prefix = ""
+ assert len(sha1hex) == 40
+ obj_path = "{}{}/{}/{}/{}{}".format(
+ prefix,
+ folder,
+ sha1hex[0:2],
+ sha1hex[2:4],
+ sha1hex,
+ extension,
+ )
+ return obj_path
+
+ def put_blob(self, folder, blob, sha1hex=None, extension="", prefix="", bucket=None):
+ """
+ blob should be bytes
+ sha1hex is assumed to be sha1 of the blob itself; if not supplied it will be calculated
+ Uploads blob to path in the given bucket. Files are stored in a top-level
+ folder, then in two levels of sub-directory based on sha1, then the
+ filename is SHA1 with an optional file extension.
+ """
+ if type(blob) == str:
+ blob = blob.encode('utf-8')
+ assert type(blob) == bytes
+ if not sha1hex:
+ h = hashlib.sha1()
+ h.update(blob)
+ sha1hex = h.hexdigest()
+ obj_path = self._blob_path(folder, sha1hex, extension, prefix)
+ if not bucket:
+ bucket = self.default_bucket
+ assert bucket
+ content_type = "application/octet-stream"
+ if extension.endswith('.xml'):
+ content_type = "application/xml"
+ if extension.endswith('.png'):
+ content_type = "image/png"
+ elif extension.endswith('.jpg') or extension.endswith('.jpeg'):
+ content_type = "image/jpeg"
+ elif extension.endswith('.txt'):
+ content_type = "text/plain"
+ self.mc.put_object(
+ bucket,
+ obj_path,
+ io.BytesIO(blob),
+ len(blob),
+ content_type=content_type,
+ )
+ return (bucket, obj_path)
+
+ def get_blob(self, folder, sha1hex, extension="", prefix="", bucket=None):
+ """
+ sha1hex is sha1 of the blob itself
+
+ Fetched blob from the given bucket/folder, using the sandcrawler SHA1 path convention
+ """
+ obj_path = self._blob_path(folder, sha1hex, extension, prefix)
+ if not bucket:
+ bucket = self.default_bucket
+ assert bucket
+ blob = self.mc.get_object(
+ bucket,
+ obj_path,
+ )
+ # TODO: optionally verify SHA-1?
+ return blob
diff --git a/python/sandcrawler/misc.py b/python/sandcrawler/misc.py
new file mode 100644
index 0000000..a3e2960
--- /dev/null
+++ b/python/sandcrawler/misc.py
@@ -0,0 +1,222 @@
+
+import base64
+import magic
+import hashlib
+import datetime
+from typing import Optional
+
+import requests
+from requests.adapters import HTTPAdapter
+from requests.packages.urllib3.util.retry import Retry # pylint: disable=import-error
+import urlcanon
+
+
+def clean_url(s: str) -> str:
+ s = s.strip()
+ parsed = urlcanon.parse_url(s)
+ if not parsed.port and parsed.colon_before_port:
+ parsed.colon_before_port = b''
+ return str(urlcanon.whatwg(parsed))
+
+def url_fuzzy_equal(left: str, right: str) -> bool:
+ """
+ TODO: use proper surt library and canonicalization for this check
+ """
+ fuzzy_left = '://'.join(clean_url(left).replace('www.', '').replace(':80/', '/').split('://')[1:])
+ fuzzy_right = '://'.join(clean_url(right).replace('www.', '').replace(':80/', '/').split('://')[1:])
+ if fuzzy_left == fuzzy_right:
+ return True
+ elif fuzzy_left == fuzzy_right + "/" or fuzzy_right == fuzzy_left + "/":
+ return True
+ return False
+
+def test_url_fuzzy_equal() -> None:
+ assert True == url_fuzzy_equal(
+ "http://www.annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree",
+ "http://annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree")
+
+def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict:
+ """
+ Takes a file blob (bytestream) and returns hashes and other metadata.
+
+ Returns a dict: size_bytes, md5hex, sha1hex, sha256hex, mimetype
+ """
+ assert blob is not None
+ if not allow_empty:
+ assert blob
+ mimetype = magic.Magic(mime=True).from_buffer(blob)
+ if mimetype in ("application/xml", "text/xml"):
+ # crude checks for XHTML or JATS XML, using only first 1 kB of file
+ if b"<htm" in blob[:1024] and b'xmlns="http://www.w3.org/1999/xhtml"' in blob[:1024]:
+ mimetype = "application/xhtml+xml"
+ elif b"<article " in blob[:1024] and not b"<html" in blob[:1024]:
+ mimetype = "application/jats+xml"
+ hashes = [
+ hashlib.sha1(),
+ hashlib.sha256(),
+ hashlib.md5(),
+ ]
+ for h in hashes:
+ h.update(blob)
+ return dict(
+ size_bytes=len(blob),
+ sha1hex=hashes[0].hexdigest(),
+ sha256hex=hashes[1].hexdigest(),
+ md5hex=hashes[2].hexdigest(),
+ mimetype=mimetype,
+ )
+
+def b32_hex(s: str) -> str:
+ """
+ Converts a base32-encoded SHA-1 checksum into hex-encoded
+
+ base32 checksums are used by, eg, heritrix and in wayback CDX files
+ """
+ s = s.strip().split()[0].lower()
+ if s.startswith("sha1:"):
+ s = s[5:]
+ if len(s) != 32:
+ if len(s) == 40:
+ return s
+ raise ValueError("not a base-32 encoded SHA-1 hash: {}".format(s))
+ return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8')
+
+NORMAL_MIME = (
+ 'application/pdf',
+ 'application/postscript',
+ 'text/html',
+ 'text/xml',
+ 'application/octet-stream',
+)
+
+def normalize_mime(raw: str) -> Optional[str]:
+ raw = raw.lower().strip()
+ for norm in NORMAL_MIME:
+ if raw.startswith(norm):
+ return norm
+
+ # Special cases
+ if raw.startswith('application/xml'):
+ return 'text/xml'
+ if raw.startswith('application/x-pdf'):
+ return 'application/pdf'
+ if raw in (
+ '.pdf',
+ ):
+ return 'application/pdf'
+ if raw in (
+ 'application/download',
+ 'binary/octet-stream',
+ 'unk',
+ 'application/x-download',
+ 'application/octetstream',
+ 'application/force-download',
+ 'application/unknown',
+ ):
+ return 'application/octet-stream'
+ return None
+
+
+def test_normalize_mime():
+ assert normalize_mime("asdf") is None
+ assert normalize_mime("application/pdf") == "application/pdf"
+ assert normalize_mime("application/pdf+journal") == "application/pdf"
+ assert normalize_mime("Application/PDF") == "application/pdf"
+ assert normalize_mime("application/p") is None
+ assert normalize_mime("application/xml+stuff") == "text/xml"
+ assert normalize_mime("application/x-pdf") == "application/pdf"
+ assert normalize_mime("application/x-html") is None
+ assert normalize_mime("unk") == "application/octet-stream"
+ assert normalize_mime("binary/octet-stream") == "application/octet-stream"
+
+
+def parse_cdx_line(raw_cdx: str, normalize=True) -> Optional[dict]:
+ """
+ This method always filters a few things out:
+
+ - non-HTTP requests, based on lack of status code (eg, whois)
+ """
+
+ cdx = raw_cdx.split()
+ if len(cdx) < 11:
+ return None
+
+ surt = cdx[0]
+ dt = cdx[1]
+ url = cdx[2]
+ mime = normalize_mime(cdx[3])
+ http_status = cdx[4]
+ sha1b32 = cdx[5]
+ c_size = cdx[8]
+ offset = cdx[9]
+ warc = cdx[10]
+
+ if not (sha1b32.isalnum() and c_size.isdigit() and offset.isdigit()
+ and len(sha1b32) == 32 and dt.isdigit()):
+ return None
+
+ if '-' in (surt, dt, url, http_status, sha1b32, c_size, offset, warc):
+ return None
+
+ if mime is None or mime == '-':
+ mime = "application/octet-stream"
+
+ if normalize:
+ mime = normalize_mime(mime)
+
+ sha1hex = b32_hex(sha1b32)
+
+ return dict(
+ surt=surt,
+ url=url,
+ datetime=dt,
+ mimetype=mime,
+ http_status=int(http_status),
+ sha1b32=sha1b32,
+ sha1hex=sha1hex,
+ warc_csize=int(c_size),
+ warc_offset=int(offset),
+ warc_path=warc,
+ )
+
+def parse_cdx_datetime(dt_str: str) -> Optional[datetime.datetime]:
+ if not dt_str:
+ return None
+ try:
+ return datetime.datetime.strptime(dt_str, "%Y%m%d%H%M%S")
+ except Exception:
+ return None
+
+def test_parse_cdx_datetime() -> None:
+ assert parse_cdx_datetime("") == None
+ assert parse_cdx_datetime("asdf") == None
+ assert parse_cdx_datetime("19930203123045") != None
+ assert parse_cdx_datetime("20201028235103") == datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3)
+
+def datetime_to_cdx(dt: datetime.datetime) -> str:
+ return '%04d%02d%02d%02d%02d%02d' % (
+ dt.year, dt.month, dt.day,
+ dt.hour, dt.minute, dt.second,
+ )
+
+def test_datetime_to_cdx() -> None:
+ assert "20201028235103" == datetime_to_cdx(datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3))
+
+def requests_retry_session(retries=10, backoff_factor=3,
+ status_forcelist=(500, 502, 504), session=None) -> requests.Session:
+ """
+ From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests
+ """
+ session = session or requests.Session()
+ retry = Retry(
+ total=retries,
+ read=retries,
+ connect=retries,
+ backoff_factor=backoff_factor,
+ status_forcelist=status_forcelist,
+ )
+ adapter = HTTPAdapter(max_retries=retry)
+ session.mount('http://', adapter)
+ session.mount('https://', adapter)
+ return session
+
diff --git a/python/sandcrawler/pdfextract.py b/python/sandcrawler/pdfextract.py
new file mode 100644
index 0000000..9b4e834
--- /dev/null
+++ b/python/sandcrawler/pdfextract.py
@@ -0,0 +1,470 @@
+
+import sys
+import json
+import datetime
+from io import BytesIO
+from dataclasses import dataclass
+from typing import Optional, Dict, Any
+
+import poppler
+from PIL import Image
+
+from .workers import SandcrawlerWorker, SandcrawlerFetchWorker
+from .misc import gen_file_metadata
+
+
+# This is a hack to work around timeouts when processing certain PDFs with
+# poppler. For some reason, the usual Kafka timeout catcher isn't working on
+# these, maybe due to threading.
+BAD_PDF_SHA1HEX = [
+ "011478a1e63a2a31eae1a93832a74cc95f220760",
+ "018dfe9824de6d2ac068ce0f7dc9961bffa1b558",
+ "057c7a9dfb611bfd52f7de6c39b2d5757c5e4e53",
+ "06061af0707298c12932516d1bb7c2b6dc443824",
+ "0641822e68c5a07538b967489fd19a1d5dc371a5",
+ "09cba9b00494d12759c50cb914f1fb7c9746f5d1",
+ "09db7c9f2efb496c974427a61e84292ae27fc702",
+ "0a1c13cb8783bbbf248b2345b9890e2410aa3f0a",
+ "0ccc6dc94f4e2d809fac8543870265c3421f3c9e",
+ "0d1c1567ea70e7b922ba88ccb868ffc7ca18e75c",
+ "10c6577a658bf6203557e2998b25ea9788f8adfe",
+ "15a720921ce30da983fcd1bfa7fe9aeeda503e41",
+ "1659881a31edc2d0e170f6bb26d32e74cc4ca387",
+ "17e679b0ec9444fff2ea4d02caec05dd2de80ec3",
+ "182749ad1db1d5e999d07f010bdcfc2978dadc88",
+ "1a17a4fc43397804830cc29021281aac2e8cf0cb",
+ "1cb166f0c0b5ffe673e6bbf6a29d77278711f253",
+ "1d04e46b6848e6479dd90fe26bb11627044fb664",
+ "1d967c95546d31edaaf0c3ef9ffcc11113a9e11a",
+ "1f90194bf0c7fff1fe1ed5fff77a934c7a1b32a0",
+ "20589d9dd0a22c8c938ad97b7f4f12648aa119fa",
+ "2195e528fa1cf5f8ae3b2adcc516896016c3411f",
+ "25ab9e6169f041be05844a9b4edd6574918af769",
+ "281de904c4642a9be4f17b9774fc0a2bdc8a90e3",
+ "2bd5322975653536550a039eb055174b2bf241b3",
+ "2fc64da736175810918fd32c94c5068b0d660bcc",
+ "32318fba9b05b2756b7362bcaa4722c92ed8d449",
+ "336833c6fc968cd0938250dfc93c032a30111cfc",
+ "362ad00bc24d650c8f11851f9e554fc560b73e7a",
+ "373f84dfab4ed47047826e604e2918a9cd6a95b2",
+ "3ac0b6e17e30d141871a0a5b127536919fe5aa19",
+ "3c8a6a708da0dc1802f5f3e5267a49b3c25e1ffe",
+ "3e5f9fb94e7314447a22f3d009419a922136177f",
+ "3fad493c940137ce703f2f570ebb504e360c6df3",
+ "40aa94602ab13e5a7d9df8c989fca4fa5c01239e",
+ "427479c94d7d0e512f898bc7ff0b6f210069f902",
+ "436c9183724f051b22c96285aa8ff1d2ba709574",
+ "43a8c0abf0386d3e3397cf5e22a884761dd63db7",
+ "445968ef735b228c08c3ff4238d99fc9f4824619",
+ "447fa6b5a90742a86429a932f6608d8e141688c0",
+ "45f014d7d631559dc7726e5c5513f1e7c91c48a9",
+ "47577ff6d6876117ca69bec60a5764f7d2c2ec70",
+ "4785181cec8944eee00ddb631a5dfc771b89bab7",
+ "47db2db2cc976429568841a0496c0ab4ed7b5977",
+ "481c0bae81873988fcc8662ba8a269e8823fdea2",
+ "4c81129904f7976a50825595a3497ea7b52579ef",
+ "4edc1402712fa6827c4501fed8042e9f4447829c",
+ "50b3c5a3122272aca69855ef06b85d0b43a76eb1",
+ "52fc9b3c5199ef395d410c7cee5961dc812e4d29",
+ "53471346019947a88c1ba141fb829375527153b0",
+ "58d9ae7dcb0a7dbbdfc58ad266030b037e9cd0ff",
+ "59cfc843ebdb1c1e5db1efc76a40f46cb3bb06f0",
+ "5ab98405b676ee81a6ca74fba51a9e4a6cff7311",
+ "5e04779cbbae5ce88bb786064f756885dd6895fe",
+ "5e6a3adde9f08c276c4efd72bfacb256f2ec35d9",
+ "623ff84b616383d0a3e0dd8dbce12f0b5fe9a6ac",
+ "646c4a654270606256397684204ff0f3d17be2e7",
+ "64d821d728f9a3dc944b4c03be00feea0b57e314",
+ "689b5cb3ddef213d612363a903f10d0358ea64d2",
+ "6909f0b62d8b7835de3dec7777aad7f8ef507ee3",
+ "74e617dc95555e8ca3aadd19d0c85b71cd77d1d9",
+ "75c2662a96ccc48891228df7c85eb7d4da9dd621",
+ "771f1ca0007a6fbed5b4a434c73f524f715d33c1",
+ "776859635e9dc01d97b0582f49c814ffbcb019fb",
+ "781dafda896a9f5c30f3d0a011f79a3b79b574c4",
+ "788672c7c2bcdecf6e2f6a2177c01e60f04d9cfb",
+ "79d6cba3c6e577a0f3a3a9fe575680d38454938d",
+ "7cfc0739be9c49d94272110a0a748256bdde9be6",
+ "7daf61526ec825151f384cc1db510ca5237d5d80",
+ "7e9d846f3bf9ce15cdb991b78cc870ab8a2bed76",
+ "8398b211a5ec4da1195a4ba1bc29ca8c0ac40f67",
+ "859d7ec532a0bf3b52b17c7f2d8ecc58410c0aad",
+ "88edcbab1cac2d70af5870422974afc253f4f0c6",
+ "89860fc475fcb2a2d86c4544df52ec8fd5e6533f",
+ "8dcaf4ef132900dd378f7be526c884b17452713b",
+ "8e4f03c29ae1fe7227140ab4b625f375f6c00d31",
+ "949dfb7d833da9576b2ccb9eb1ab5457469c53d3",
+ "961ec451172f373f919c593737466300e42062cb",
+ "976989fa6e447578d9ce16ec5b526f0e09d6df50",
+ "98b02eb70066c182c705ef4d14d8b723ad7f1fab",
+ "993ca31f6974f8387bb18dd7d38987d290da8781",
+ "9dbd05af3442e6f42d67868054751b76973f4171",
+ "a2298c137b9c8c8975bad62eea9224edb95e6952",
+ "a2671738755ab8b24775e95375dc72f1ca4e5fd6",
+ "a26f299fb97c646effeebd4c5e2968786bd0f781",
+ "a48f9b7ad627909f76d780aa4208530304ece42c",
+ "a69665d0b5d3b95f54f68406eee3ed50c67efb45",
+ "a69665d0b5d3b95f54f68406eee3ed50c67efb45",
+ "a8357c31837404f9ebd798999d546c9398ab3648",
+ "a9162b9aef5e5da0897275fede1a6cff8cc93dfc",
+ "ad038725bf6855a79f3c768ebe93c7103d14522f",
+ "aef581bf42e76e527f5aed3b8958fd4e7a24819f",
+ "b2b66b9c7f817a20144456f99c0be805602e8597",
+ "b2d719120306b90eb8dd3580b699a61ec70556f4",
+ "b4b8e18e27f102e59b2be2d58c7b54d0a0eb457a",
+ "b5be7f409a3a2601208c5ce08cf52b9ac1094aae",
+ "b5bf8b7467fb095c90adf3b49aa1687291e4469c",
+ "b8b427e5b3d650ba9e03197f9c3917e25b878930",
+ "bad48b89b639b5b7df2c6a2d5288181fcb8b0e35",
+ "be0cda7642e9247b3ee41cd2017fa709aab4f344",
+ "c1b583fbd052572f08158d39ffe4d7510dadbebb",
+ "c2526f75a013dc67b14ce1e2d0e4fc80bb93c6e1",
+ "c4abbb284f4acaca9e8ceb88f842901984e84d33",
+ "c7220d1bf1e71fb755d9f26bbdd4c539dc162960",
+ "c7687fa6f637c7d32a25be0e772867d87536d35c",
+ "c7d8b37ec99cf0d987e60667f05299f200e18a5d",
+ "c92b9ae9eefa07504950b405625aef54b48f0e1a",
+ "ccb1debcfae006a3fc984e9e91309b9706a5c375",
+ "cd611c765cbb0b3b7cb2fdc07d8f0b9cc93ec257",
+ "cd8a7c3b8d850ebedc1ca791ccb37b9a2689f9c3",
+ "d055c054c330f99ec011e37186d2b429339758fd",
+ "d17b1e254cce82df5c6eb4fd492cef91e7e11558",
+ "d188762a7e3ab5d4ee8a897204316513e4e636ec",
+ "d613b9e4442f5d5d19ea6814fa9729bff7da7c85",
+ "d6b0f405bf13c23d0e90c54eea527442786d1cd3",
+ "da2211ee2dbc6dda36571976d810e2366a3d2504",
+ "e01bb7256d77aea258313bb410dfcfc10512f420",
+ "e2bf5d0a5885359381fe8ef2cd9290171d494e9b",
+ "e2c3b8a2cf33d5e8972bc9ddb78373766a75e412",
+ "e64714a81f60ab9286ec90cad682cb22e564fb6f",
+ "e9d7716b4f94bbc3d94459b5fe9bb8b15cb2e433",
+ "e9e84e17383e93a784a8471708619162b32fb399",
+ "eac7df5f799983d5a7cc55d10b4d426dc557febf",
+ "eaf84b2efd2f69c7b3f407f89ea66ac4c41fac36",
+ "eb1b39fd7a874896688855a22efddef10272427c",
+ "eb5fffaa590a52bcc3705b888c6ff9c4dc4c45b2",
+ "edf8dcc8736f06afbaca0e01d60bd2c475403a3d",
+ "ee2ee6ae2cf05128810d0d95bbe69bd263e140de",
+ "ee9530a2c5a3d1e3813ccb51a55cc8b0d9b5dfc7",
+ "ef1dfa325c21cff4cd8bb1a9b6c4ee6996d43c8f",
+ "ef6749d9263a01f921ba7d72df0d17671d14e5f6",
+ "f0ea221d8587cede25592266486e119d277f7096",
+ "f68f9a9202a75d2aee35252e104d796f9515001e",
+ "f9314d3bf2eac78a7d78d18adcccdb35542054ef",
+ "fd9bd560662e070b222d63052830837829c490f0",
+]
+
+@dataclass
+class PdfExtractResult:
+ sha1hex: str
+ status: str
+ error_msg: Optional[str] = None
+ file_meta: Optional[Dict[str,Any]] = None
+ text: Optional[str] = None
+ page0_thumbnail: Optional[bytes] = None
+ has_page0_thumbnail: bool = False
+ meta_xml: Optional[str] = None
+ pdf_info: Optional[Dict[str,Any]] = None
+ pdf_extra: Optional[Dict[str,Any]] = None
+ source: Optional[Dict[str,Any]] = None
+
+ def to_pdftext_dict(self) -> dict:
+ """
+ Outputs a JSON string as would be published to Kafka text/info topic.
+ """
+ return {
+ 'key': self.sha1hex,
+ 'sha1hex': self.sha1hex,
+ 'status': self.status,
+ 'file_meta': self.file_meta,
+ 'error_msg': self.error_msg,
+ 'text': self.text,
+ 'has_page0_thumbnail': self.has_page0_thumbnail,
+ 'meta_xml': self.meta_xml,
+ 'pdf_info': self.pdf_info,
+ 'pdf_extra': self.pdf_extra,
+ 'source': self.source,
+ }
+
+ @classmethod
+ def from_pdftext_dict(cls, record):
+ """
+ Outputs a JSON string as would be published to Kafka text/info topic.
+ """
+ if record['status'] != 'success':
+ return PdfExtractResult(
+ sha1hex=record.get('sha1hex') or record['key'],
+ status=record['status'],
+ error_msg=record.get('error_msg'),
+ )
+ else:
+ return PdfExtractResult(
+ sha1hex=record['sha1hex'],
+ status=record['status'],
+ file_meta=record.get('file_meta'),
+ text=record.get('text'),
+ has_page0_thumbnail=bool(record.get('has_page0_thumbnail', False)),
+ meta_xml=record.get('meta_xml'),
+ pdf_info=record.get('pdf_info'),
+ pdf_extra=record.get('pdf_extra'),
+ )
+
+ @classmethod
+ def from_pdf_meta_dict(cls, record):
+ """
+ Parses what would be returned from postgrest
+ """
+ if record['status'] != 'success':
+ return PdfExtractResult(
+ sha1hex=record['sha1hex'],
+ status=record['status'],
+ error_msg=(record.get('metadata') or {}).get('error_msg'),
+ )
+ else:
+ pdf_extra = dict()
+ for k in ('page_count', 'page0_height', 'page0_width', 'permanent_id', 'pdf_version'):
+ if record.get(k):
+ pdf_extra[k] = record[k]
+ return PdfExtractResult(
+ sha1hex=record['sha1hex'],
+ status=record['status'],
+ has_page0_thumbnail=bool(record.get('has_page0_thumbnail', False)),
+ pdf_info=record.get('metadata'),
+ pdf_extra=pdf_extra,
+ )
+
+ def to_sql_tuple(self) -> tuple:
+ # pdf_meta (sha1hex, updated, status, page0_thumbnail, page_count,
+ # word_count, page0_height, page0_width, permanent_id, pdf_created,
+ # pdf_version, metadata)
+ word_count: Optional[int] = None
+ if self.text:
+ word_count = len(self.text.split())
+ metadata: Optional[Dict] = None
+ pdf_extra = self.pdf_extra or dict()
+ pdf_created = None
+ # TODO: form, encrypted
+ if self.pdf_info:
+ metadata = dict()
+ for k in ('Title', 'Subject', 'Author', 'Creator', 'Producer', 'doi'):
+ if k in self.pdf_info:
+ metadata[k.lower()] = self.pdf_info[k]
+ if 'CreationDate' in self.pdf_info:
+ pdf_created = self.pdf_info['CreationDate']
+ metadata_json: Optional[str] = None
+ if metadata:
+ metadata_json = json.dumps(metadata, sort_keys=True)
+ return (
+ self.sha1hex,
+ datetime.datetime.now(), # updated
+ self.status,
+ self.has_page0_thumbnail,
+ pdf_extra.get('page_count'),
+ word_count,
+ pdf_extra.get('page0_height'),
+ pdf_extra.get('page0_width'),
+ pdf_extra.get('permanent_id'),
+ pdf_created,
+ pdf_extra.get('pdf_version'),
+ metadata_json,
+ )
+
+
+def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtractResult:
+ """
+ A known issue is that output text is in "physical layout" mode, which means
+ columns will be side-by-side. We would prefer a single stream of tokens!
+
+ Tried using page.text(layout_mode=poppler.TextLayout.raw_order_layout)
+ instead of the default mode (poppler.TextLayout.physical_layout), but that
+ didn't seem to work at all (returned empty strings).
+ """
+ file_meta = gen_file_metadata(blob)
+ sha1hex = file_meta['sha1hex']
+ if file_meta['mimetype'] != 'application/pdf':
+ return PdfExtractResult(
+ sha1hex=sha1hex,
+ status='not-pdf',
+ error_msg=f"mimetype is '{file_meta['mimetype']}'",
+ file_meta=file_meta,
+ )
+
+ if sha1hex in BAD_PDF_SHA1HEX:
+ return PdfExtractResult(
+ sha1hex=sha1hex,
+ status='bad-pdf',
+ error_msg=f"PDF known to cause processing issues",
+ file_meta=file_meta,
+ )
+
+ print(f"\tpoppler processing: {sha1hex}", file=sys.stderr)
+ try:
+ pdf = poppler.load_from_data(blob)
+ if pdf is None:
+ return PdfExtractResult(
+ sha1hex=sha1hex,
+ status='empty-pdf',
+ file_meta=file_meta,
+ has_page0_thumbnail=False,
+ )
+ page0 = pdf.create_page(0)
+ if page0 is None:
+ return PdfExtractResult(
+ sha1hex=sha1hex,
+ status='empty-page0',
+ file_meta=file_meta,
+ )
+ # this call sometimes fails an returns an AttributeError
+ page0rect = page0.page_rect()
+ except (AttributeError, poppler.document.LockedDocumentError) as e:
+ # may need to expand the set of exceptions caught here over time, but
+ # starting with a narrow set
+ return PdfExtractResult(
+ sha1hex=sha1hex,
+ status='parse-error',
+ error_msg=str(e),
+ file_meta=file_meta,
+ )
+
+ assert page0 is not None
+ page0_thumbnail: Optional[bytes] = None
+ renderer = poppler.PageRenderer()
+ try:
+ full_img = renderer.render_page(page0)
+ img = Image.frombuffer("RGBA", (full_img.width, full_img.height), full_img.data, 'raw', "BGRA", 0, 1)
+ img.thumbnail(thumb_size, Image.BICUBIC)
+ buf = BytesIO()
+ img.save(buf, thumb_type)
+ page0_thumbnail = buf.getvalue()
+ # assuming that very small images mean something went wrong
+ if page0_thumbnail is None or len(page0_thumbnail) < 50:
+ page0_thumbnail = None
+ except Exception as e:
+ print(str(e), file=sys.stderr)
+ page0_thumbnail = None
+
+ try:
+ full_text = page0.text()
+ for n in range(1, pdf.pages):
+ pageN = pdf.create_page(n)
+ full_text += pageN.text()
+ except AttributeError as e:
+ return PdfExtractResult(
+ sha1hex=sha1hex,
+ status='parse-error',
+ error_msg=str(e),
+ file_meta=file_meta,
+ )
+
+ # Kafka message size limit; cap at about 1 MByte
+ if len(full_text)> 1000000:
+ return PdfExtractResult(
+ sha1hex=sha1hex,
+ status='text-too-large',
+ error_msg="full_text chars: {}".format(len(full_text)),
+ file_meta=file_meta,
+ )
+ if len(pdf.metadata)> 1000000:
+ return PdfExtractResult(
+ sha1hex=sha1hex,
+ status='text-too-large',
+ error_msg="meta_xml chars: {}".format(len(full_text)),
+ file_meta=file_meta,
+ )
+
+ try:
+ pdf_info = pdf.infos()
+ except UnicodeDecodeError:
+ return PdfExtractResult(
+ sha1hex=sha1hex,
+ status='bad-unicode',
+ error_msg="in infos()",
+ file_meta=file_meta,
+ )
+
+ # TODO: is this actually needed? or does json marshalling work automatically?
+ for k in pdf_info.keys():
+ if isinstance(pdf_info[k], datetime.datetime):
+ pdf_info[k] = datetime.datetime.isoformat(pdf_info[k])
+
+ permanent_id: Optional[str] = None
+ update_id: Optional[str] = None
+ try:
+ permanent_id = pdf.pdf_id.permanent_id
+ update_id = pdf.pdf_id.update_id
+ except TypeError:
+ pass
+
+ return PdfExtractResult(
+ sha1hex=sha1hex,
+ file_meta=file_meta,
+ status='success',
+ error_msg=None,
+ text=full_text or None,
+ has_page0_thumbnail=page0_thumbnail is not None,
+ page0_thumbnail=page0_thumbnail,
+ meta_xml=pdf.metadata or None,
+ pdf_info=pdf_info,
+ pdf_extra=dict(
+ page0_height=page0rect.height,
+ page0_width=page0rect.width,
+ page_count=pdf.pages,
+ permanent_id=permanent_id,
+ update_id=update_id,
+ pdf_version=f"{pdf.pdf_version[0]}.{pdf.pdf_version[1]}",
+ ),
+ )
+
+class PdfExtractWorker(SandcrawlerFetchWorker):
+
+ def __init__(self, wayback_client=None, sink=None, **kwargs):
+ super().__init__(wayback_client=wayback_client)
+ self.wayback_client = wayback_client
+ self.sink = sink
+ self.thumbnail_sink = kwargs.get('thumbnail_sink')
+
+ def timeout_response(self, task) -> Dict:
+ default_key = task['sha1hex']
+ return dict(
+ status="error-timeout",
+ error_msg="internal pdf-extract worker timeout",
+ source=task,
+ sha1hex=default_key,
+ )
+
+ def process(self, record, key: Optional[str] = None):
+ default_key = record['sha1hex']
+
+ fetch_result = self.fetch_blob(record)
+ if fetch_result['status'] != 'success':
+ return fetch_result
+ blob = fetch_result['blob']
+
+ result = process_pdf(blob)
+ result.source = record
+ if self.thumbnail_sink and result.page0_thumbnail is not None:
+ self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex)
+ return result.to_pdftext_dict()
+
+class PdfExtractBlobWorker(SandcrawlerWorker):
+ """
+ This is sort of like PdfExtractWorker, except it receives blobs directly,
+ instead of fetching blobs from some remote store.
+ """
+
+ def __init__(self, sink=None, **kwargs):
+ super().__init__()
+ self.sink = sink
+ self.thumbnail_sink = kwargs.get('thumbnail_sink')
+
+ def process(self, blob, key: Optional[str] = None):
+ if not blob:
+ return None
+ assert isinstance(blob, bytes)
+
+ result = process_pdf(blob)
+ if self.thumbnail_sink and result.page0_thumbnail is not None:
+ self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex)
+
+ return result.to_pdftext_dict()
+
diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py
new file mode 100644
index 0000000..161dc9c
--- /dev/null
+++ b/python/sandcrawler/pdftrio.py
@@ -0,0 +1,130 @@
+
+import time
+import requests
+
+from .workers import SandcrawlerWorker, SandcrawlerFetchWorker
+from .misc import gen_file_metadata, requests_retry_session
+
+
+class PdfTrioClient(object):
+
+ def __init__(self, host_url="http://pdftrio.qa.fatcat.wiki", **kwargs):
+ self.host_url = host_url
+ self.http_session = requests_retry_session(retries=3, backoff_factor=3)
+
+ def classify_pdf(self, blob, mode="auto"):
+ """
+ Returns a dict with at least:
+
+ - status_code (int, always set)
+ - status (success, or error-*)
+
+ On success, the other remote API JSON response keys are also included.
+
+ On HTTP-level failures, the status_code and status field are set
+ appropriately; an optional `error_msg` may also be set. For some other
+ errors, like connection failure, an exception is raised.
+ """
+ assert blob
+
+ try:
+ pdftrio_response = requests.post(
+ self.host_url + "/classify/research-pub/" + mode,
+ files={
+ 'pdf_content': blob,
+ },
+ timeout=60.0,
+ )
+ except requests.Timeout:
+ return {
+ 'status': 'error-timeout',
+ 'status_code': -4, # heritrix3 "HTTP timeout" code
+ 'error_msg': 'pdftrio request (HTTP POST) timeout',
+ }
+ except requests.exceptions.ConnectionError:
+ # crude back-off
+ time.sleep(2.0)
+ return {
+ 'status': 'error-connect',
+ 'status_code': -2, # heritrix3 "HTTP connect" code
+ 'error_msg': 'pdftrio request connection timout',
+ }
+
+ info = dict(
+ status_code=pdftrio_response.status_code,
+ )
+ if pdftrio_response.status_code == 200:
+ resp_json = pdftrio_response.json()
+ assert 'ensemble_score' in resp_json
+ assert 'status' in resp_json
+ assert 'versions' in resp_json
+ info.update(resp_json)
+ else:
+ info['status'] = 'error'
+ # TODO: might return JSON with some info?
+
+ info['_total_sec'] = pdftrio_response.elapsed.total_seconds()
+ return info
+
+
+class PdfTrioWorker(SandcrawlerFetchWorker):
+ """
+ This class is basically copied directly from GrobidWorker
+ """
+
+ def __init__(self, pdftrio_client, wayback_client=None, sink=None, **kwargs):
+ super().__init__(wayback_client=wayback_client)
+ self.pdftrio_client = pdftrio_client
+ self.sink = sink
+
+ def process(self, record, key=None):
+ start_process = time.time()
+ default_key = record['sha1hex']
+ fetch_sec = None
+
+ start = time.time()
+ fetch_result = self.fetch_blob(record)
+ fetch_sec = time.time() - start
+ if fetch_result['status'] != 'success':
+ return fetch_result
+ blob = fetch_result['blob']
+
+ result = dict()
+ result['file_meta'] = gen_file_metadata(blob)
+ result['key'] = result['file_meta']['sha1hex']
+ result['pdf_trio'] = self.pdftrio_client.classify_pdf(blob)
+ result['source'] = record
+ result['timing'] = dict(
+ pdftrio_sec=result['pdf_trio'].pop('_total_sec', None),
+ total_sec=time.time() - start_process,
+ )
+ if fetch_sec:
+ result['timing']['fetch_sec'] = fetch_sec
+ return result
+
+class PdfTrioBlobWorker(SandcrawlerWorker):
+ """
+ This is sort of like PdfTrioWorker, except it receives blobs directly,
+ instead of fetching blobs from some remote store.
+ """
+
+ def __init__(self, pdftrio_client, sink=None, mode="auto", **kwargs):
+ super().__init__()
+ self.pdftrio_client = pdftrio_client
+ self.sink = sink
+ self.mode = mode
+
+ def process(self, blob, key=None):
+ start_process = time.time()
+ if not blob:
+ return None
+ result = dict()
+ result['file_meta'] = gen_file_metadata(blob)
+ result['key'] = result['file_meta']['sha1hex']
+ result['pdf_trio'] = self.pdftrio_client.classify_pdf(blob, mode=self.mode)
+ result['timing'] = dict(
+ pdftrio_sec=result['pdf_trio'].pop('_total_sec', None),
+ total_sec=time.time() - start_process,
+ )
+ return result
+
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
new file mode 100644
index 0000000..a388b90
--- /dev/null
+++ b/python/sandcrawler/persist.py
@@ -0,0 +1,584 @@
+
+"""
+cdx
+- read raw CDX, filter
+- push to SQL table
+
+ingest-file-result
+- read JSON format (batch)
+- cdx SQL push batch (on conflict skip)
+- file_meta SQL push batch (on conflict update)
+- ingest request push batch (on conflict skip)
+- ingest result push batch (on conflict update)
+
+grobid
+- reads JSON format (batch)
+- grobid2json
+- minio push (one-by-one)
+- grobid SQL push batch (on conflict update)
+- file_meta SQL push batch (on conflict update)
+"""
+
+import os
+from typing import Optional, AnyStr
+import xml.etree.ElementTree
+
+from sandcrawler.workers import SandcrawlerWorker
+from sandcrawler.db import SandcrawlerPostgresClient
+from sandcrawler.minio import SandcrawlerMinioClient
+from sandcrawler.grobid import GrobidClient
+from sandcrawler.pdfextract import PdfExtractResult
+from sandcrawler.html_ingest import HtmlMetaRow
+
+
+class PersistCdxWorker(SandcrawlerWorker):
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+
+ def process(self, record, key=None):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+ # filter to full CDX lines, no liveweb
+ cdx_batch = [r for r in batch if r.get('warc_path') and ("/" in r['warc_path'])]
+ resp = self.db.insert_cdx(self.cur, cdx_batch)
+ if len(cdx_batch) < len(batch):
+ self.counts['skip'] += len(batch) - len(cdx_batch)
+ self.counts['insert-cdx'] += resp[0]
+ self.counts['update-cdx'] += resp[1]
+ self.db.commit()
+ return []
+
+class PersistIngestFileResultWorker(SandcrawlerWorker):
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+
+ def process(self, record, key=None):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def request_to_row(self, raw):
+ """
+ Converts ingest-request JSON schema (eg, from Kafka) to SQL ingest_request schema
+
+ if there is a problem with conversion, return None
+ """
+ # backwards compat hacks; transform request to look like current schema
+ if raw.get('ingest_type') == 'file':
+ raw['ingest_type'] = 'pdf'
+ if (not raw.get('link_source')
+ and raw.get('base_url')
+ and raw.get('ext_ids', {}).get('doi')
+ and raw['base_url'] == "https://doi.org/{}".format(raw['ext_ids']['doi'])):
+ # set link_source(_id) for old ingest requests
+ raw['link_source'] = 'doi'
+ raw['link_source_id'] = raw['ext_ids']['doi']
+ if (not raw.get('link_source')
+ and raw.get('ingest_request_source', '').startswith('savepapernow')
+ and raw.get('fatcat', {}).get('release_ident')):
+ # set link_source(_id) for old ingest requests
+ raw['link_source'] = 'spn'
+ raw['link_source_id'] = raw['fatcat']['release_ident']
+
+ for k in ('ingest_type', 'base_url', 'link_source', 'link_source_id'):
+ if not k in raw:
+ self.counts['skip-request-fields'] += 1
+ return None
+ if raw['ingest_type'] not in ('pdf', 'xml', 'html'):
+ self.counts['skip-ingest-type'] += 1
+ return None
+ request = {
+ 'ingest_type': raw['ingest_type'],
+ 'base_url': raw['base_url'],
+ 'link_source': raw['link_source'],
+ 'link_source_id': raw['link_source_id'],
+ 'ingest_request_source': raw.get('ingest_request_source'),
+ 'request': {},
+ }
+ # extra/optional fields
+ if raw.get('release_stage'):
+ request['release_stage'] = raw['release_stage']
+ if raw.get('fatcat', {}).get('release_ident'):
+ request['request']['release_ident'] = raw['fatcat']['release_ident']
+ for k in ('ext_ids', 'edit_extra', 'rel'):
+ if raw.get(k):
+ request['request'][k] = raw[k]
+ # if this dict is empty, trim it to save DB space
+ if not request['request']:
+ request['request'] = None
+ return request
+
+
+ def file_result_to_row(self, raw: dict) -> Optional[dict]:
+ """
+ Converts ingest-result JSON schema (eg, from Kafka) to SQL ingest_file_result schema
+
+ if there is a problem with conversion, return None and set skip count
+ """
+ for k in ('request', 'hit', 'status'):
+ if not k in raw:
+ self.counts['skip-result-fields'] += 1
+ return None
+ if not 'base_url' in raw['request']:
+ self.counts['skip-result-fields'] += 1
+ return None
+ ingest_type = raw['request'].get('ingest_type')
+ if ingest_type == 'file':
+ ingest_type = 'pdf'
+ if ingest_type not in ('pdf', 'xml', 'html'):
+ self.counts['skip-ingest-type'] += 1
+ return None
+ if raw['status'] in ("existing", ):
+ self.counts['skip-existing'] += 1
+ return None
+ result = {
+ 'ingest_type': ingest_type,
+ 'base_url': raw['request']['base_url'],
+ 'hit': raw['hit'],
+ 'status': raw['status'],
+ }
+ terminal = raw.get('terminal')
+ if terminal:
+ result['terminal_url'] = terminal.get('terminal_url') or terminal.get('url')
+ result['terminal_dt'] = terminal.get('terminal_dt')
+ result['terminal_status_code'] = terminal.get('terminal_status_code') or terminal.get('status_code') or terminal.get('http_code')
+ if result['terminal_status_code']:
+ result['terminal_status_code'] = int(result['terminal_status_code'])
+ result['terminal_sha1hex'] = terminal.get('terminal_sha1hex')
+ if len(result['terminal_url']) > 2048:
+ # postgresql13 doesn't like extremely large URLs in b-tree index
+ self.counts['skip-huge-url'] += 1
+ return None
+ return result
+
+ def result_to_html_meta(self, record: dict) -> Optional[HtmlMetaRow]:
+ html_body = record.get('html_body')
+ file_meta = record.get('file_meta')
+ if not (file_meta and html_body):
+ return None
+ return HtmlMetaRow(
+ sha1hex=file_meta["sha1hex"],
+ status=record.get('status'),
+ scope=record.get('scope'),
+ has_teixml=bool(html_body and html_body['status'] == 'success'),
+ has_thumbnail=False, # TODO
+ word_count=(html_body and html_body.get('word_count')) or None,
+ biblio=record.get('html_biblio'),
+ resources=record.get('html_resources'),
+ )
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+
+ if not batch:
+ return []
+
+ results = [self.file_result_to_row(raw) for raw in batch]
+ results = [r for r in results if r]
+
+ requests = [self.request_to_row(raw['request']) for raw in batch if raw.get('request')]
+ requests = [r for r in requests if r]
+
+ if requests:
+ resp = self.db.insert_ingest_request(self.cur, requests)
+ self.counts['insert-requests'] += resp[0]
+ self.counts['update-requests'] += resp[1]
+ if results:
+ resp = self.db.insert_ingest_file_result(self.cur, results, on_conflict="update")
+ self.counts['insert-results'] += resp[0]
+ self.counts['update-results'] += resp[1]
+
+ # these schemas match, so can just pass through
+ cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx')]
+ revisit_cdx_batch = [r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx')]
+ cdx_batch.extend(revisit_cdx_batch)
+ # filter to full CDX lines, with full warc_paths (not liveweb)
+ cdx_batch = [r for r in cdx_batch if r.get('warc_path') and ("/" in r['warc_path'])]
+ if cdx_batch:
+ resp = self.db.insert_cdx(self.cur, cdx_batch)
+ self.counts['insert-cdx'] += resp[0]
+ self.counts['update-cdx'] += resp[1]
+
+ file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')]
+ if file_meta_batch:
+ resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="nothing")
+ self.counts['insert-file_meta'] += resp[0]
+ self.counts['update-file_meta'] += resp[1]
+
+ html_meta_batch = [self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body')]
+ if html_meta_batch:
+ resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="update")
+ self.counts['insert-html_meta'] += resp[0]
+ self.counts['update-html_meta'] += resp[1]
+
+ self.db.commit()
+ return []
+
+class PersistIngestRequestWorker(PersistIngestFileResultWorker):
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__(db_url=db_url)
+
+ def process(self, record, key=None):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+
+ if not batch:
+ return []
+
+ requests = [self.request_to_row(raw) for raw in batch]
+ requests = [r for r in requests if r]
+
+ if requests:
+ resp = self.db.insert_ingest_request(self.cur, requests)
+ self.counts['insert-requests'] += resp[0]
+ self.counts['update-requests'] += resp[1]
+
+ self.db.commit()
+ return []
+
+class PersistGrobidWorker(SandcrawlerWorker):
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.grobid = GrobidClient()
+ self.s3 = SandcrawlerMinioClient(
+ host_url=kwargs.get('s3_url', 'localhost:9000'),
+ access_key=kwargs['s3_access_key'],
+ secret_key=kwargs['s3_secret_key'],
+ default_bucket=kwargs['s3_bucket'],
+ )
+ self.s3_only = kwargs.get('s3_only', False)
+ self.db_only = kwargs.get('db_only', False)
+ assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed"
+ if not self.s3_only:
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+ else:
+ self.db = None
+ self.cur = None
+
+ def process(self, record, key=None):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+
+ # filter out bad "missing status_code" timeout rows
+ missing = [r for r in batch if not r.get('status_code')]
+ if missing:
+ self.counts['skip-missing-status'] += len(missing)
+ batch = [r for r in batch if r.get('status_code')]
+
+ for r in batch:
+ if r['status_code'] != 200 or not r.get('tei_xml'):
+ self.counts['s3-skip-status'] += 1
+ if r.get('error_msg'):
+ r['metadata'] = {'error_msg': r['error_msg'][:500]}
+ continue
+
+ assert len(r['key']) == 40
+ if not self.db_only:
+ resp = self.s3.put_blob(
+ folder="grobid",
+ blob=r['tei_xml'],
+ sha1hex=r['key'],
+ extension=".tei.xml",
+ )
+ self.counts['s3-put'] += 1
+
+ # enhance with teixml2json metadata, if available
+ try:
+ metadata = self.grobid.metadata(r)
+ except xml.etree.ElementTree.ParseError as xml_e:
+ r['status'] = 'bad-grobid-xml'
+ r['metadata'] = {'error_msg': str(xml_e)[:1024]}
+ continue
+ if not metadata:
+ continue
+ for k in ('fatcat_release', 'grobid_version'):
+ r[k] = metadata.pop(k, None)
+ if r.get('fatcat_release'):
+ r['fatcat_release'] = r['fatcat_release'].replace('release_', '')
+ if metadata.get('grobid_timestamp'):
+ r['updated'] = metadata['grobid_timestamp']
+ r['metadata'] = metadata
+
+ if not self.s3_only:
+ resp = self.db.insert_grobid(self.cur, batch, on_conflict="update")
+ self.counts['insert-grobid'] += resp[0]
+ self.counts['update-grobid'] += resp[1]
+
+ file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')]
+ resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update")
+ self.counts['insert-file-meta'] += resp[0]
+ self.counts['update-file-meta'] += resp[1]
+
+ self.db.commit()
+
+ return []
+
+
+class PersistGrobidDiskWorker(SandcrawlerWorker):
+ """
+ Writes blobs out to disk.
+
+ This could be refactored into a "Sink" type with an even thinner wrapper.
+ """
+
+ def __init__(self, output_dir):
+ super().__init__()
+ self.output_dir = output_dir
+
+ def _blob_path(self, sha1hex, extension=".tei.xml"):
+ obj_path = "{}/{}/{}{}".format(
+ sha1hex[0:2],
+ sha1hex[2:4],
+ sha1hex,
+ extension,
+ )
+ return obj_path
+
+ def process(self, record, key=None):
+
+ if record.get('status_code') != 200 or not record.get('tei_xml'):
+ return False
+ assert(len(record['key'])) == 40
+ p = "{}/{}".format(self.output_dir, self._blob_path(record['key']))
+ os.makedirs(os.path.dirname(p), exist_ok=True)
+ with open(p, 'w') as f:
+ f.write(record.pop('tei_xml'))
+ self.counts['written'] += 1
+ return record
+
+
+class PersistPdfTrioWorker(SandcrawlerWorker):
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+
+ def process(self, record, key=None):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+
+ batch = [r for r in batch if 'pdf_trio' in r and r['pdf_trio'].get('status_code')]
+ for r in batch:
+ # copy key (sha1hex) into sub-object
+ r['pdf_trio']['key'] = r['key']
+ pdftrio_batch = [r['pdf_trio'] for r in batch]
+ resp = self.db.insert_pdftrio(self.cur, pdftrio_batch, on_conflict="update")
+ self.counts['insert-pdftrio'] += resp[0]
+ self.counts['update-pdftrio'] += resp[1]
+
+ file_meta_batch = [r['file_meta'] for r in batch if r['pdf_trio']['status'] == "success" and r.get('file_meta')]
+ resp = self.db.insert_file_meta(self.cur, file_meta_batch)
+ self.counts['insert-file-meta'] += resp[0]
+ self.counts['update-file-meta'] += resp[1]
+
+ self.db.commit()
+ return []
+
+
+class PersistPdfTextWorker(SandcrawlerWorker):
+ """
+ Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL table.
+
+ Should keep batch sizes small.
+ """
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.s3 = SandcrawlerMinioClient(
+ host_url=kwargs.get('s3_url', 'localhost:9000'),
+ access_key=kwargs['s3_access_key'],
+ secret_key=kwargs['s3_secret_key'],
+ default_bucket=kwargs['s3_bucket'],
+ )
+ self.s3_only = kwargs.get('s3_only', False)
+ self.db_only = kwargs.get('db_only', False)
+ assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed"
+ if not self.s3_only:
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+ else:
+ self.db = None
+ self.cur = None
+
+ def process(self, record, key=None):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+
+ parsed_batch = []
+ for r in batch:
+ parsed_batch.append(PdfExtractResult.from_pdftext_dict(r))
+
+ for r in parsed_batch:
+ if r.status != 'success' or not r.text:
+ self.counts['s3-skip-status'] += 1
+ if r.error_msg:
+ r.metadata = {'error_msg': r.error_msg[:500]}
+ continue
+
+ assert len(r.sha1hex) == 40
+ if not self.db_only:
+ resp = self.s3.put_blob(
+ folder="text",
+ blob=r.text,
+ sha1hex=r.sha1hex,
+ extension=".txt",
+ )
+ self.counts['s3-put'] += 1
+
+ if not self.s3_only:
+ resp = self.db.insert_pdf_meta(self.cur, parsed_batch, on_conflict="update")
+ self.counts['insert-pdf-meta'] += resp[0]
+ self.counts['update-pdf-meta'] += resp[1]
+
+ file_meta_batch = [r.file_meta for r in parsed_batch if r.file_meta]
+ resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update")
+ self.counts['insert-file-meta'] += resp[0]
+ self.counts['update-file-meta'] += resp[1]
+
+ self.db.commit()
+
+ return []
+
+
+class PersistThumbnailWorker(SandcrawlerWorker):
+ """
+ Pushes text file to blob store (S3/seaweed/minio) and PDF metadata to SQL
+ table.
+
+ This worker *must* be used with raw kakfa mode; thumbnails are *not*
+ wrapped in JSON like most sandcrawler kafka messages.
+ """
+
+ def __init__(self, **kwargs):
+ super().__init__()
+ self.s3 = SandcrawlerMinioClient(
+ host_url=kwargs.get('s3_url', 'localhost:9000'),
+ access_key=kwargs['s3_access_key'],
+ secret_key=kwargs['s3_secret_key'],
+ default_bucket=kwargs['s3_bucket'],
+ )
+ self.s3_extension = kwargs.get('s3_extension', ".jpg")
+ self.s3_folder = kwargs.get('s3_folder', "pdf")
+
+ def process(self, blob: bytes, key: Optional[str] = None):
+ """
+ Processing raw messages, not decoded JSON objects
+ """
+
+ if isinstance(key, bytes):
+ key = key.decode('utf-8')
+ assert key is not None and len(key) == 40 and isinstance(key, str)
+ assert isinstance(blob, bytes)
+ assert len(blob) >= 50
+
+ resp = self.s3.put_blob(
+ folder=self.s3_folder,
+ blob=blob,
+ sha1hex=key,
+ extension=self.s3_extension,
+ )
+ self.counts['s3-put'] += 1
+
+
+class GenericPersistDocWorker(SandcrawlerWorker):
+ """
+ Pushes blobs from Kafka to S3.
+
+ Objects are assumed to be JSON-wrapped strings.
+ """
+
+ def __init__(self, **kwargs):
+ super().__init__()
+ self.s3 = SandcrawlerMinioClient(
+ host_url=kwargs.get('s3_url', 'localhost:9000'),
+ access_key=kwargs['s3_access_key'],
+ secret_key=kwargs['s3_secret_key'],
+ default_bucket=kwargs['s3_bucket'],
+ )
+ self.s3_extension = kwargs.get('s3_extension', ".unknown")
+ self.s3_folder = kwargs.get('s3_folder', "unknown")
+ self.doc_key = "unknown"
+
+ def process(self, record: dict, key: Optional[AnyStr] = None) -> None:
+
+ if record.get('status') != 'success' or not record.get(self.doc_key):
+ return
+
+ assert key is not None
+ if isinstance(key, bytes):
+ key_str = key.decode('utf-8')
+ elif isinstance(key, str):
+ key_str = key
+ assert len(key_str) == 40
+ if 'sha1hex' in record:
+ assert key_str == record['sha1hex']
+
+ resp = self.s3.put_blob(
+ folder=self.s3_folder,
+ blob=record[self.doc_key].encode('utf-8'),
+ sha1hex=key_str,
+ extension=self.s3_extension,
+ )
+ self.counts['s3-put'] += 1
+
+
+class PersistXmlDocWorker(GenericPersistDocWorker):
+ """
+ Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to
+ sandcrawler database (SQL).
+ """
+
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ self.s3_extension = kwargs.get('s3_extension', ".jats.xml")
+ self.s3_folder = kwargs.get('s3_folder', "xml_doc")
+ self.doc_key = "jats_xml"
+
+
+class PersistHtmlTeiXmlWorker(GenericPersistDocWorker):
+ """
+ Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to
+ sandcrawler database (SQL).
+ """
+
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ self.s3_extension = kwargs.get('s3_extension', ".tei.xml")
+ self.s3_folder = kwargs.get('s3_folder', "html_body")
+ self.doc_key = "tei_xml"
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
new file mode 100644
index 0000000..37e3d7a
--- /dev/null
+++ b/python/sandcrawler/workers.py
@@ -0,0 +1,625 @@
+
+import sys
+import json
+import time
+import signal
+import zipfile
+import requests
+import multiprocessing.pool
+from collections import Counter
+from confluent_kafka import Consumer, Producer, KafkaException
+
+from .misc import parse_cdx_line
+from .ia import SandcrawlerBackoffError, WaybackError, WaybackContentError, PetaboxError
+
+
+class SandcrawlerWorker(object):
+ """
+ Base class for sandcrawler workers.
+
+ Usually these get "pushed" into by a RecordPusher. Output goes to another
+ worker (pipeline-style), or defaults to stdout.
+ """
+
+ def __init__(self):
+ self.counts = Counter()
+ self.sink = None
+ # TODO: self.counters
+
+ def push_record(self, task, key=None):
+ self.counts['total'] += 1
+ if not self.want(task):
+ self.counts['skip'] += 1
+ return
+ result = self.process(task, key=key)
+ if not result:
+ self.counts['failed'] += 1
+ return
+ elif type(result) == dict and 'status' in result and len(result['status']) < 32:
+ self.counts[result['status']] += 1
+
+ if self.sink:
+ self.sink.push_record(result)
+ self.counts['pushed'] += 1
+ else:
+ print(json.dumps(result))
+ return result
+
+ def timeout_response(self, task):
+ """
+ This should be overridden by workers that want to return something
+ meaningful when there is a processing timeout. Eg, JSON vs some other
+ error message.
+ """
+ return None
+
+ def push_record_timeout(self, task, key=None, timeout=300):
+ """
+ A wrapper around self.push_record which sets a timeout.
+
+ Note that this uses signals and *will behave wrong/weirdly* with
+ multithreading or if signal-based timeouts are used elsewhere in the
+ same process.
+ """
+
+ def timeout_handler(signum, frame):
+ raise TimeoutError("timeout processing record")
+ signal.signal(signal.SIGALRM, timeout_handler)
+ resp = None
+ signal.alarm(int(timeout))
+ try:
+ resp = self.push_record(task, key=key)
+ except TimeoutError:
+ self.counts['timeout'] += 1
+ resp = self.timeout_response(task) # pylint: disable=assignment-from-none
+ # TODO: what if it is this push_record() itself that is timing out?
+ if resp and self.sink:
+ self.sink.push_record(resp)
+ self.counts['pushed'] += 1
+ elif resp:
+ print(json.dumps(resp))
+ finally:
+ signal.alarm(0)
+ return resp
+
+ def push_batch(self, tasks):
+ results = []
+ for task in tasks:
+ results.append(self.push_record(task))
+ return results
+
+ def finish(self):
+ if self.sink:
+ self.sink.finish()
+ print("Worker: {}".format(self.counts), file=sys.stderr)
+ return self.counts
+
+ def want(self, task):
+ """
+ Optionally override this as a filter in implementations.
+ """
+ return True
+
+ def process(self, task, key=None):
+ """
+ Derived workers need to implement business logic here.
+ """
+ raise NotImplementedError('implementation required')
+
+
+class SandcrawlerFetchWorker(SandcrawlerWorker):
+ """
+ Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg,
+ PDFs) from wayback, archive.org, or other sources.
+ """
+
+ def __init__(self, wayback_client, **kwargs):
+ super().__init__(**kwargs)
+ self.wayback_client = wayback_client
+
+ def fetch_blob(self, record):
+ start_process = time.time()
+ default_key = record['sha1hex']
+ wayback_sec = None
+ petabox_sec = None
+
+ if record.get('warc_path') and record.get('warc_offset'):
+ # it's a full CDX dict. fetch using WaybackClient
+ if not self.wayback_client:
+ raise Exception("wayback client not configured for this PdfTrioWorker")
+ try:
+ start = time.time()
+ blob = self.wayback_client.fetch_petabox_body(
+ csize=record['warc_csize'],
+ offset=record['warc_offset'],
+ warc_path=record['warc_path'],
+ )
+ wayback_sec = time.time() - start
+ except (WaybackError, WaybackContentError, PetaboxError, KeyError) as we:
+ return dict(
+ key=default_key,
+ source=record,
+ status="error-wayback",
+ error_msg=str(we),
+ )
+ elif record.get('url') and record.get('datetime'):
+ # it's a partial CDX dict or something? fetch using WaybackClient
+ if not self.wayback_client:
+ raise Exception("wayback client not configured for this PdfTrioWorker")
+ try:
+ start = time.time()
+ blob = self.wayback_client.fetch_replay_body(
+ url=record['url'],
+ datetime=record['datetime'],
+ )
+ wayback_sec = time.time() - start
+ except (WaybackError, WaybackContentError) as we:
+ return dict(
+ key=default_key,
+ source=record,
+ status="error-wayback",
+ error_msg=str(we),
+ )
+ elif record.get('item') and record.get('path'):
+ # it's petabox link; fetch via HTTP
+ start = time.time()
+ resp = requests.get("https://archive.org/serve/{}/{}".format(
+ record['item'], record['path']))
+ petabox_sec = time.time() - start
+ try:
+ resp.raise_for_status()
+ except Exception as e:
+ return dict(
+ key=default_key,
+ source=record,
+ status="error-petabox",
+ error_msg=str(e),
+ )
+ blob = resp.content
+ else:
+ raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
+ if not blob:
+ return dict(
+ key=default_key,
+ source=record,
+ status="empty-blob",
+ )
+ return dict(
+ key=default_key,
+ status="success",
+ source=record,
+ blob=blob,
+ )
+
+class MultiprocessWrapper(SandcrawlerWorker):
+
+ def __init__(self, worker, sink, jobs=None):
+ self.counts = Counter()
+ self.worker = worker
+ self.sink = sink
+ self.pool = multiprocessing.pool.Pool(jobs)
+
+ def push_batch(self, tasks):
+ self.counts['total'] += len(tasks)
+ print("... processing batch of: {}".format(len(tasks)), file=sys.stderr)
+ results = self.pool.map(self.worker.process, tasks)
+ for result in results:
+ if not result:
+ self.counts['failed'] += 1
+ return
+ elif type(result) == dict and 'status' in result and len(result['status']) < 32:
+ self.counts[result['status']] += 1
+
+ if self.sink:
+ self.sink.push_record(result)
+ self.counts['pushed'] += 1
+ else:
+ print(json.dumps(result))
+ return results
+
+ def finish(self):
+ self.pool.terminate()
+ if self.sink:
+ self.sink.finish()
+ worker_counts = self.worker.finish()
+ print("Multiprocessing: {}".format(self.counts), file=sys.stderr)
+ return worker_counts
+
+class BlackholeSink(SandcrawlerWorker):
+ """
+ Dummy SandcrawlerWorker. That doesn't do or process anything.
+
+ Useful for tests.
+ """
+
+ def push_record(self, task, key=None):
+ return
+
+ def push_batch(self, tasks):
+ return
+
+class KafkaSink(SandcrawlerWorker):
+
+ def __init__(self, kafka_hosts, produce_topic, **kwargs):
+ self.sink = None
+ self.counts = Counter()
+ self.produce_topic = produce_topic
+ self.kafka_hosts = kafka_hosts
+
+ config = self.producer_config({
+ 'bootstrap.servers': kafka_hosts,
+ 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes
+ 'api.version.request': True,
+ 'api.version.fallback.ms': 0,
+ })
+ self.producer = Producer(config)
+
+
+ @staticmethod
+ def _fail_fast(err, msg):
+ if err is not None:
+ print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+
+ def producer_config(self, kafka_config):
+ config = kafka_config.copy()
+ config.update({
+ 'delivery.report.only.error': True,
+ 'default.topic.config': {
+ 'message.timeout.ms': 30000,
+ 'request.required.acks': -1, # all brokers must confirm
+ }
+ })
+ return config
+
+ def push_record(self, msg, key=None):
+ self.counts['total'] += 1
+ if type(msg) == dict:
+ if not key and 'key' in msg:
+ key = msg['key']
+ msg = json.dumps(msg)
+ if type(msg) == str:
+ msg = msg.encode('utf-8')
+ assert type(msg) == bytes
+
+ self.producer.produce(
+ self.produce_topic,
+ msg,
+ key=key,
+ on_delivery=self._fail_fast)
+ self.counts['produced'] += 1
+
+ # check for errors etc
+ self.producer.poll(0)
+
+ def push_batch(self, msgs):
+ for m in msgs:
+ self.push_record(m)
+
+ def finish(self):
+ self.producer.flush()
+ return self.counts
+
+
+class KafkaCompressSink(KafkaSink):
+ """
+ Variant of KafkaSink for large documents. Used for, eg, GROBID output.
+ """
+
+ def producer_config(self, kafka_config):
+ config = kafka_config.copy()
+ config.update({
+ 'compression.codec': 'gzip',
+ 'retry.backoff.ms': 250,
+ 'linger.ms': 1000,
+ 'batch.num.messages': 50,
+ 'delivery.report.only.error': True,
+ 'default.topic.config': {
+ 'message.timeout.ms': 30000,
+ 'request.required.acks': -1, # all brokers must confirm
+ }
+ })
+ return config
+
+
+class RecordPusher:
+ """
+ Base class for different record sources to be pushed into workers. Pretty
+ trivial interface, just wraps an importer and pushes records in to it.
+ """
+
+ def __init__(self, worker, **kwargs):
+ self.counts = Counter()
+ self.worker = worker
+
+ def run(self):
+ """
+ This will look something like:
+
+ for line in sys.stdin:
+ record = json.loads(line)
+ self.worker.push_record(record)
+ print(self.worker.finish())
+ """
+ raise NotImplementedError
+
+
+class JsonLinePusher(RecordPusher):
+
+ def __init__(self, worker, json_file, **kwargs):
+ self.counts = Counter()
+ self.worker = worker
+ self.json_file = json_file
+ self.batch_size = kwargs.get('batch_size', None)
+ if self.batch_size in (0, 1):
+ self.batch_size = None
+
+ def run(self):
+ batch = []
+ for line in self.json_file:
+ if not line:
+ continue
+ self.counts['total'] += 1
+ try:
+ record = json.loads(line)
+ except json.decoder.JSONDecodeError:
+ self.counts['error-json-decode'] += 1
+ continue
+ if self.batch_size:
+ batch.append(record)
+ if len(batch) >= self.batch_size:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
+ else:
+ self.worker.push_record(record)
+ self.counts['pushed'] += 1
+ if self.batch_size and batch:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
+ worker_counts = self.worker.finish()
+ print("JSON lines pushed: {}".format(self.counts), file=sys.stderr)
+ return self.counts
+
+
+class CdxLinePusher(RecordPusher):
+
+ def __init__(self, worker, cdx_file, **kwargs):
+ self.counts = Counter()
+ self.worker = worker
+ self.cdx_file = cdx_file
+ self.filter_http_statuses = kwargs.get('filter_http_statuses', None)
+ self.filter_mimetypes = kwargs.get('filter_mimetypes', None)
+ self.allow_octet_stream = kwargs.get('allow_octet_stream', False)
+ self.batch_size = kwargs.get('batch_size', None)
+ if self.batch_size in (0, 1):
+ self.batch_size = None
+
+ def run(self):
+ batch = []
+ for line in self.cdx_file:
+ if not line:
+ continue
+ self.counts['total'] += 1
+ record = parse_cdx_line(line, normalize=True)
+ if not record:
+ self.counts['skip-parse'] += 1
+ continue
+ if self.filter_http_statuses and record['http_status'] not in self.filter_http_statuses:
+ self.counts['skip-http_status'] += 1
+ continue
+ if self.filter_mimetypes and record['mimetype'] not in self.filter_mimetypes:
+ self.counts['skip-mimetype'] += 1
+ continue
+ if self.batch_size:
+ batch.append(record)
+ if len(batch) >= self.batch_size:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
+ else:
+ self.worker.push_record(record)
+ self.counts['pushed'] += 1
+ if self.batch_size and batch:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
+ worker_counts = self.worker.finish()
+ print("CDX lines pushed: {}".format(self.counts), file=sys.stderr)
+ return self.counts
+
+
+class ZipfilePusher(RecordPusher):
+
+ def __init__(self, worker, zipfile_path, **kwargs):
+ self.counts = Counter()
+ self.worker = worker
+ self.filter_suffix = ".pdf"
+ self.zipfile_path = zipfile_path
+ self.batch_size = kwargs.get('batch_size', None)
+ if self.batch_size in (0, 1):
+ self.batch_size = None
+
+ def run(self):
+ batch = []
+ with zipfile.ZipFile(self.zipfile_path, 'r') as archive:
+ for zipinfo in archive.infolist():
+ if not zipinfo.filename.endswith(self.filter_suffix):
+ continue
+ self.counts['total'] += 1
+ # NB doesn't really extract the file, just gives you a stream (file-like-object) for reading it
+ flo = archive.open(zipinfo, 'r')
+ data = flo.read(2**32)
+ flo.close()
+ if self.batch_size:
+ batch.append(data)
+ if len(batch) >= self.batch_size:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
+ else:
+ self.worker.push_record(data)
+ self.counts['pushed'] += 1
+ if self.batch_size and batch:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
+ worker_counts = self.worker.finish()
+ print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
+ return self.counts
+
+class KafkaJsonPusher(RecordPusher):
+
+ def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs):
+ self.counts = Counter()
+ self.worker = worker
+ self.consumer = make_kafka_consumer(
+ kafka_hosts,
+ consume_topic,
+ group,
+ )
+ self.push_batches = kwargs.get('push_batches', False)
+ self.raw_records = kwargs.get('raw_records', False)
+ self.poll_interval = kwargs.get('poll_interval', 5.0)
+ self.batch_size = kwargs.get('batch_size', 100)
+ if self.batch_size in (0, 1):
+ self.batch_size = 1
+ self.batch_worker = kwargs.get('batch_worker', False)
+ self.process_timeout_sec = kwargs.get('process_timeout_sec', 300)
+
+ def run(self):
+ while True:
+ # TODO: this is batch-oriented, because underlying worker is
+ # often batch-oriented, but this doesn't confirm that entire batch
+ # has been pushed to fatcat before commiting offset. Eg, consider
+ # case where there there is one update and thousands of creates;
+ # update would be lingering in worker, and if worker crashed
+ # never created. Not great.
+ batch = self.consumer.consume(
+ num_messages=self.batch_size,
+ timeout=self.poll_interval)
+ print("... got {} kafka messages ({}sec poll interval)".format(
+ len(batch), self.poll_interval),
+ file=sys.stderr)
+ if not batch:
+ # TODO: could have some larger timeout here and
+ # self.worker.finish() if it's been more than, eg, a couple
+ # minutes
+ continue
+ # first check errors on entire batch...
+ for msg in batch:
+ if msg.error():
+ raise KafkaException(msg.error())
+ # ... then process
+ if self.push_batches:
+ self.counts['total'] += len(batch)
+ records = [json.loads(msg.value().decode('utf-8')) for msg in batch]
+ self.worker.push_batch(records)
+ self.counts['pushed'] += len(batch)
+ print("Import counts: {}".format(self.worker.counts), file=sys.stderr)
+ else:
+ for msg in batch:
+ self.counts['total'] += 1
+ if self.raw_records:
+ # In this mode, pass the Kafka message as bytes through
+ # without decoding as JSON. Eg, for thumbnails (where
+ # message bytes are JPEG, and we need # the sha1hex key
+ # from the message)
+ record = msg.value()
+ else:
+ record = json.loads(msg.value().decode('utf-8'))
+ # This complex bit of code implements backoff/backpressure
+ # in a way that will not cause this Kafka consumer to lose
+ # partition assignments (resulting in a rebalance). This
+ # was needed for the ingest workers. There is probably a
+ # better way to structure this concurrency.
+ done = False
+ while not done:
+ try:
+ # use timeouts; don't want kafka itself to timeout
+ self.worker.push_record_timeout(record, key=msg.key(), timeout=self.process_timeout_sec)
+ break
+ except SandcrawlerBackoffError as be:
+ print("Backing off for 200 seconds: {}".format(be))
+ self.consumer.pause(self.consumer.assignment())
+ for i in range(40):
+ # Beware this poll which should not be
+ # receiving any messages because we are paused!
+ empty_batch = self.consumer.poll(0)
+ assert not empty_batch
+ time.sleep(5)
+ self.consumer.resume(self.consumer.assignment())
+ self.counts['pushed'] += 1
+ if self.counts['total'] % 500 == 0:
+ print("Import counts: {}".format(self.worker.counts), file=sys.stderr)
+ for msg in batch:
+ # locally store offsets of processed messages; will be
+ # auto-commited by librdkafka from this "stored" value
+ self.consumer.store_offsets(message=msg)
+
+ # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or
+ # commit the current batch if it has been lingering
+ worker_counts = self.worker.finish()
+ print("KafkaJson lines pushed: {}".format(self.counts), file=sys.stderr)
+ self.consumer.close()
+ return self.counts
+
+
+def make_kafka_consumer(hosts, consume_topic, group):
+ topic_name = consume_topic
+
+ def fail_fast(err, partitions):
+ if err is not None:
+ print("Kafka consumer commit error: {}".format(err), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+ for p in partitions:
+ # check for partition-specific commit errors
+ if p.error:
+ print("Kafka consumer commit error: {}".format(p.error), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(p.error)
+ #print("Kafka consumer commit successful")
+ pass
+
+ # previously, using pykafka
+ #auto_commit_enable=True,
+ #auto_commit_interval_ms=30000, # 30 seconds
+ conf = {
+ 'bootstrap.servers': hosts,
+ 'group.id': group,
+ 'on_commit': fail_fast,
+ # messages don't have offset marked as stored until processed,
+ # but we do auto-commit stored offsets to broker
+ 'enable.auto.offset.store': False,
+ 'enable.auto.commit': True,
+ # user code timeout; if no poll after this long, assume user code
+ # hung and rebalance (default: 6min)
+ 'max.poll.interval.ms': 360000,
+ 'default.topic.config': {
+ 'auto.offset.reset': 'latest',
+ },
+ }
+
+ def on_rebalance(consumer, partitions):
+ for p in partitions:
+ if p.error:
+ raise KafkaException(p.error)
+ print("Kafka partitions rebalanced: {} / {}".format(
+ consumer, partitions),
+ file=sys.stderr)
+
+ consumer = Consumer(conf)
+ # NOTE: it's actually important that topic_name *not* be bytes (UTF-8
+ # encoded)
+ consumer.subscribe([topic_name],
+ on_assign=on_rebalance,
+ on_revoke=on_rebalance,
+ )
+ print("Consuming from kafka topic {}, group {}".format(topic_name, group), file=sys.stderr)
+ return consumer
diff --git a/python/sandcrawler/xml.py b/python/sandcrawler/xml.py
new file mode 100644
index 0000000..7a0086d
--- /dev/null
+++ b/python/sandcrawler/xml.py
@@ -0,0 +1,7 @@
+
+import xml.etree.ElementTree as ET
+
+
+def xml_reserialize(raw: bytes) -> str:
+ root = ET.fromstring(raw)
+ return '<?xml version="1.0" encoding="UTF-8"?>\n' + ET.tostring(root, encoding="unicode")