From 826c7538e091fac14d987a3cd654975da964e240 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 27 Oct 2021 18:50:17 -0700 Subject: make fmt (black 21.9b0) --- python/sandcrawler/__init__.py | 52 ++- python/sandcrawler/db.py | 263 +++++++------ python/sandcrawler/fileset_platforms.py | 463 +++++++++++++---------- python/sandcrawler/fileset_strategies.py | 171 +++++---- python/sandcrawler/fileset_types.py | 2 + python/sandcrawler/grobid.py | 105 +++--- python/sandcrawler/html.py | 253 +++++++------ python/sandcrawler/html_metadata.py | 133 +++---- python/sandcrawler/ia.py | 611 ++++++++++++++++++------------- python/sandcrawler/ingest_file.py | 477 +++++++++++++----------- python/sandcrawler/ingest_fileset.py | 372 ++++++++++--------- python/sandcrawler/ingest_html.py | 139 ++++--- python/sandcrawler/minio.py | 52 +-- python/sandcrawler/misc.py | 115 +++--- python/sandcrawler/pdfextract.py | 147 ++++---- python/sandcrawler/pdftrio.py | 78 ++-- python/sandcrawler/persist.py | 450 ++++++++++++----------- python/sandcrawler/workers.py | 289 ++++++++------- 18 files changed, 2332 insertions(+), 1840 deletions(-) (limited to 'python/sandcrawler') diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py index 46735eb..6718c57 100644 --- a/python/sandcrawler/__init__.py +++ b/python/sandcrawler/__init__.py @@ -1,16 +1,48 @@ from .db import SandcrawlerPostgresClient, SandcrawlerPostgrestClient from .grobid import GrobidBlobWorker, GrobidClient, GrobidWorker -from .ia import (CdxApiClient, CdxApiError, CdxPartial, CdxRow, PetaboxError, ResourceResult, - SavePageNowClient, SavePageNowError, WarcResource, WaybackClient, - WaybackContentError, WaybackError) +from .ia import ( + CdxApiClient, + CdxApiError, + CdxPartial, + CdxRow, + PetaboxError, + ResourceResult, + SavePageNowClient, + SavePageNowError, + WarcResource, + WaybackClient, + WaybackContentError, + WaybackError, +) from .ingest_file import IngestFileWorker from .ingest_fileset import IngestFilesetWorker -from .misc import (b32_hex, clean_url, gen_file_metadata, gen_file_metadata_path, - parse_cdx_datetime, parse_cdx_line) +from .misc import ( + b32_hex, + clean_url, + gen_file_metadata, + gen_file_metadata_path, + parse_cdx_datetime, + parse_cdx_line, +) from .pdfextract import PdfExtractBlobWorker, PdfExtractWorker from .pdftrio import PdfTrioBlobWorker, PdfTrioClient, PdfTrioWorker -from .persist import (PersistCdxWorker, PersistGrobidDiskWorker, PersistGrobidWorker, - PersistIngestFileResultWorker, PersistIngestRequestWorker, - PersistPdfTextWorker, PersistPdfTrioWorker, PersistThumbnailWorker) -from .workers import (BlackholeSink, CdxLinePusher, JsonLinePusher, KafkaCompressSink, - KafkaJsonPusher, KafkaSink, MultiprocessWrapper, ZipfilePusher) +from .persist import ( + PersistCdxWorker, + PersistGrobidDiskWorker, + PersistGrobidWorker, + PersistIngestFileResultWorker, + PersistIngestRequestWorker, + PersistPdfTextWorker, + PersistPdfTrioWorker, + PersistThumbnailWorker, +) +from .workers import ( + BlackholeSink, + CdxLinePusher, + JsonLinePusher, + KafkaCompressSink, + KafkaJsonPusher, + KafkaSink, + MultiprocessWrapper, + ZipfilePusher, +) diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py index ee4d3bf..69d2116 100644 --- a/python/sandcrawler/db.py +++ b/python/sandcrawler/db.py @@ -12,12 +12,12 @@ class SandcrawlerPostgrestClient: self.api_url = api_url def get_cdx(self, url: str) -> Optional[dict]: - resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.' + url)) + resp = requests.get(self.api_url + "/cdx", params=dict(url="eq." + url)) resp.raise_for_status() return resp.json() or None def get_grobid(self, sha1: str) -> Optional[dict]: - resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.' + sha1)) + resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex="eq." + sha1)) resp.raise_for_status() resp_json = resp.json() if resp_json: @@ -26,7 +26,7 @@ class SandcrawlerPostgrestClient: return None def get_pdftrio(self, sha1: str) -> Optional[dict]: - resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.' + sha1)) + resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex="eq." + sha1)) resp.raise_for_status() resp_json = resp.json() if resp_json: @@ -35,7 +35,7 @@ class SandcrawlerPostgrestClient: return None def get_pdf_meta(self, sha1: str) -> Optional[dict]: - resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.' + sha1)) + resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex="eq." + sha1)) resp.raise_for_status() resp_json = resp.json() if resp_json: @@ -56,7 +56,7 @@ class SandcrawlerPostgrestClient: return None def get_file_meta(self, sha1: str) -> Optional[dict]: - resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.' + sha1)) + resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex="eq." + sha1)) resp.raise_for_status() resp_json = resp.json() if resp_json: @@ -89,7 +89,7 @@ class SandcrawlerPostgrestClient: return None def get_crossref(self, doi: str) -> Optional[dict]: - resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.' + doi)) + resp = requests.get(self.api_url + "/crossref", params=dict(doi="eq." + doi)) resp.raise_for_status() resp_json = resp.json() if resp_json: @@ -117,10 +117,12 @@ class SandcrawlerPostgresClient: updates = 0 return (inserts, updates) - def insert_cdx(self, - cur: psycopg2.extensions.cursor, - batch: List[Dict[str, Any]], - on_conflict: str = "nothing") -> Tuple[int, int]: + def insert_cdx( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) @@ -133,11 +135,21 @@ class SandcrawlerPostgresClient: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [d for d in batch if d.get('warc_path')] + batch = [d for d in batch if d.get("warc_path")] if not batch: return (0, 0) - rows = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'], d['warc_path'], - int(d['warc_csize']), int(d['warc_offset'])) for d in batch] + rows = [ + ( + d["url"], + d["datetime"], + d["sha1hex"], + d["mimetype"], + d["warc_path"], + int(d["warc_csize"]), + int(d["warc_offset"]), + ) + for d in batch + ] # filter out duplicate rows by key (url, datetime) row_dict = dict() for b in rows: @@ -146,10 +158,12 @@ class SandcrawlerPostgresClient: resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_file_meta(self, - cur: psycopg2.extensions.cursor, - batch: List[Dict[str, Any]], - on_conflict: str = "nothing") -> Tuple[int, int]: + def insert_file_meta( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO file_meta(sha1hex, sha256hex, md5hex, size_bytes, mimetype) @@ -168,8 +182,10 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - rows = [(d['sha1hex'], d['sha256hex'], d['md5hex'], int(d['size_bytes']), d['mimetype']) - for d in batch] + rows = [ + (d["sha1hex"], d["sha256hex"], d["md5hex"], int(d["size_bytes"]), d["mimetype"]) + for d in batch + ] # filter out duplicate rows by key (sha1hex) row_dict = dict() for b in rows: @@ -178,10 +194,12 @@ class SandcrawlerPostgresClient: resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_grobid(self, - cur: psycopg2.extensions.cursor, - batch: List[Dict[str, Any]], - on_conflict: str = "nothing") -> Tuple[int, int]: + def insert_grobid( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO grobid (sha1hex, grobid_version, status_code, status, fatcat_release, updated, metadata) @@ -203,24 +221,27 @@ class SandcrawlerPostgresClient: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" for r in batch: - if r.get('metadata'): + if r.get("metadata"): # sometimes these are only in metadata; shouldn't pass through # though (to save database space) - dupe_fields = ('fatcat_release', 'grobid_version') + dupe_fields = ("fatcat_release", "grobid_version") for k in dupe_fields: if k not in r: - r[k] = r['metadata'].get(k) - r['metadata'].pop(k, None) - r['metadata'] = json.dumps(r['metadata'], sort_keys=True) - rows = [( - d['key'], - d.get('grobid_version') or None, - d['status_code'], - d['status'], - d.get('fatcat_release') or None, - d.get('updated') or datetime.datetime.now(), - d.get('metadata') or None, - ) for d in batch] + r[k] = r["metadata"].get(k) + r["metadata"].pop(k, None) + r["metadata"] = json.dumps(r["metadata"], sort_keys=True) + rows = [ + ( + d["key"], + d.get("grobid_version") or None, + d["status_code"], + d["status"], + d.get("fatcat_release") or None, + d.get("updated") or datetime.datetime.now(), + d.get("metadata") or None, + ) + for d in batch + ] # filter out duplicate rows by key (sha1hex) row_dict = dict() for b in rows: @@ -229,10 +250,9 @@ class SandcrawlerPostgresClient: resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_pdf_meta(self, - cur: psycopg2.extensions.cursor, - rows: List[Tuple], - on_conflict: str = "nothing") -> Tuple[int, int]: + def insert_pdf_meta( + self, cur: psycopg2.extensions.cursor, rows: List[Tuple], on_conflict: str = "nothing" + ) -> Tuple[int, int]: """ batch elements are expected to have .to_sql_tuple() method """ @@ -269,10 +289,9 @@ class SandcrawlerPostgresClient: resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_html_meta(self, - cur: psycopg2.extensions.cursor, - rows: List[Tuple], - on_conflict: str = "nothing") -> Tuple[int, int]: + def insert_html_meta( + self, cur: psycopg2.extensions.cursor, rows: List[Tuple], on_conflict: str = "nothing" + ) -> Tuple[int, int]: """ batch elements are expected to have .to_sql_tuple() method """ @@ -306,10 +325,12 @@ class SandcrawlerPostgresClient: resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_pdftrio(self, - cur: psycopg2.extensions.cursor, - batch: List[Dict[str, Any]], - on_conflict: str = "nothing") -> Tuple[int, int]: + def insert_pdftrio( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO pdftrio (sha1hex, updated, status_code, status, pdftrio_version, @@ -335,18 +356,21 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - rows = [( - d['key'], - d.get('updated') or datetime.datetime.now(), - d['status_code'], - d['status'], - d.get('versions', {}).get('pdftrio_version') or None, - d.get('versions', {}).get('models_date') or None, - d.get('ensemble_score'), - d.get('bert_score'), - d.get('linear_score'), - d.get('image_score'), - ) for d in batch] + rows = [ + ( + d["key"], + d.get("updated") or datetime.datetime.now(), + d["status_code"], + d["status"], + d.get("versions", {}).get("pdftrio_version") or None, + d.get("versions", {}).get("models_date") or None, + d.get("ensemble_score"), + d.get("bert_score"), + d.get("linear_score"), + d.get("image_score"), + ) + for d in batch + ] # filter out duplicate rows by key (sha1hex) row_dict = dict() for b in rows: @@ -355,10 +379,12 @@ class SandcrawlerPostgresClient: resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_ingest_request(self, - cur: psycopg2.extensions.cursor, - batch: List[Dict[str, Any]], - on_conflict: str = "nothing") -> Tuple[int, int]: + def insert_ingest_request( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request) @@ -372,21 +398,24 @@ class SandcrawlerPostgresClient: sql += " RETURNING xmax;" for r in batch: # in case these fields were already packed into 'request' - extra = r.get('request', {}) - for k in ('ext_ids', 'fatcat_release', 'edit_extra', 'rel'): + extra = r.get("request", {}) + for k in ("ext_ids", "fatcat_release", "edit_extra", "rel"): if r.get(k): extra[k] = r[k] if extra: - r['extra'] = json.dumps(extra, sort_keys=True) - rows = [( - d['link_source'], - d['link_source_id'], - d['ingest_type'], - d['base_url'], - d.get('ingest_request_source'), - d.get('release_stage') or None, - d.get('extra') or None, - ) for d in batch] + r["extra"] = json.dumps(extra, sort_keys=True) + rows = [ + ( + d["link_source"], + d["link_source_id"], + d["ingest_type"], + d["base_url"], + d.get("ingest_request_source"), + d.get("release_stage") or None, + d.get("extra") or None, + ) + for d in batch + ] # filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url) row_dict = dict() for b in rows: @@ -395,10 +424,12 @@ class SandcrawlerPostgresClient: resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_ingest_file_result(self, - cur: psycopg2.extensions.cursor, - batch: List[Dict[str, Any]], - on_conflict: str = "nothing") -> Tuple[int, int]: + def insert_ingest_file_result( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex) @@ -420,16 +451,19 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - rows = [( - d['ingest_type'], - d['base_url'], - bool(d['hit']), - d['status'], - d.get('terminal_url'), - d.get('terminal_dt'), - d.get('terminal_status_code'), - d.get('terminal_sha1hex'), - ) for d in batch] + rows = [ + ( + d["ingest_type"], + d["base_url"], + bool(d["hit"]), + d["status"], + d.get("terminal_url"), + d.get("terminal_dt"), + d.get("terminal_status_code"), + d.get("terminal_sha1hex"), + ) + for d in batch + ] # filter out duplicate rows by key (ingest_type, base_url) row_dict = dict() for b in rows: @@ -438,10 +472,12 @@ class SandcrawlerPostgresClient: resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) - def insert_ingest_fileset_platform(self, - cur: psycopg2.extensions.cursor, - batch: List[Dict[str, Any]], - on_conflict: str = "nothing") -> Tuple[int, int]: + def insert_ingest_fileset_platform( + self, + cur: psycopg2.extensions.cursor, + batch: List[Dict[str, Any]], + on_conflict: str = "nothing", + ) -> Tuple[int, int]: sql = """ INSERT INTO ingest_fileset_platform (ingest_type, base_url, hit, status, platform_name, platform_domain, platform_id, ingest_strategy, total_size, file_count, archiveorg_item_name, archiveorg_item_bundle_path, web_bundle_url, web_bundle_dt, manifest) @@ -470,23 +506,26 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - rows = [( - d['ingest_type'], - d['base_url'], - bool(d['hit']), - d['status'], - d.get('platform_name'), - d.get('platform_domain'), - d.get('platform_id'), - d.get('ingest_strategy'), - d.get('total_size'), - d.get('file_count'), - d.get('archiveorg_item_name'), - d.get('archiveorg_item_bundle_path'), - d.get('web_bundle_url'), - d.get('web_bundle_dt'), - d.get('manifest'), - ) for d in batch] + rows = [ + ( + d["ingest_type"], + d["base_url"], + bool(d["hit"]), + d["status"], + d.get("platform_name"), + d.get("platform_domain"), + d.get("platform_id"), + d.get("ingest_strategy"), + d.get("total_size"), + d.get("file_count"), + d.get("archiveorg_item_name"), + d.get("archiveorg_item_bundle_path"), + d.get("web_bundle_url"), + d.get("web_bundle_dt"), + d.get("manifest"), + ) + for d in batch + ] # filter out duplicate rows by key (ingest_type, base_url) row_dict = dict() for b in rows: diff --git a/python/sandcrawler/fileset_platforms.py b/python/sandcrawler/fileset_platforms.py index 86e3ff2..fbe8066 100644 --- a/python/sandcrawler/fileset_platforms.py +++ b/python/sandcrawler/fileset_platforms.py @@ -4,25 +4,38 @@ from typing import Optional, Tuple import internetarchive import requests -from sandcrawler.fileset_types import (FilesetManifestFile, FilesetPlatformItem, IngestStrategy, - PlatformRestrictedError, PlatformScopeError) +from sandcrawler.fileset_types import ( + FilesetManifestFile, + FilesetPlatformItem, + IngestStrategy, + PlatformRestrictedError, + PlatformScopeError, +) from sandcrawler.html_metadata import BiblioMetadata from sandcrawler.ia import ResourceResult -class FilesetPlatformHelper(): +class FilesetPlatformHelper: def __init__(self): - self.platform_name = 'unknown' - - def match_request(self, request: dict, resource: Optional[ResourceResult], - html_biblio: Optional[BiblioMetadata]) -> bool: + self.platform_name = "unknown" + + def match_request( + self, + request: dict, + resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata], + ) -> bool: """ Does this request look like it matches this platform? """ raise NotImplementedError() - def process_request(self, request: dict, resource: Optional[ResourceResult], - html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: + def process_request( + self, + request: dict, + resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata], + ) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) """ @@ -47,7 +60,7 @@ class FilesetPlatformHelper(): class DataverseHelper(FilesetPlatformHelper): def __init__(self): super().__init__() - self.platform_name = 'dataverse' + self.platform_name = "dataverse" self.session = requests.Session() @staticmethod @@ -69,16 +82,16 @@ class DataverseHelper(FilesetPlatformHelper): If there is an error parsing, raises a ValueError """ id_type = None - if pid.startswith('doi:10.'): - id_type = 'doi' + if pid.startswith("doi:10."): + id_type = "doi" pid = pid[4:] - elif pid.startswith('hdl:'): - id_type = 'hdl' + elif pid.startswith("hdl:"): + id_type = "hdl" pid = pid[4:] else: raise ValueError(f"unknown dataverse persistentId format: {pid}") - comp = pid.split('/') + comp = pid.split("/") if len(comp) < 2: raise ValueError(f"unknown dataverse persistentId format: {pid}") @@ -114,19 +127,23 @@ class DataverseHelper(FilesetPlatformHelper): "file_id": file_id, } - def match_request(self, request: dict, resource: Optional[ResourceResult], - html_biblio: Optional[BiblioMetadata]) -> bool: + def match_request( + self, + request: dict, + resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata], + ) -> bool: if resource and resource.terminal_url: url = resource.terminal_url else: - url = request['base_url'] + url = request["base_url"] # TODO: could also do HTML platform detection or something? components = urllib.parse.urlparse(url) # platform_domain = components.netloc.split(':')[0].lower() params = urllib.parse.parse_qs(components.query) - id_param = params.get('persistentId') + id_param = params.get("persistentId") if not id_param: return False platform_id = id_param[0] @@ -138,8 +155,12 @@ class DataverseHelper(FilesetPlatformHelper): return True - def process_request(self, request: dict, resource: Optional[ResourceResult], - html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: + def process_request( + self, + request: dict, + resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata], + ) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) @@ -150,17 +171,17 @@ class DataverseHelper(FilesetPlatformHelper): if resource and resource.terminal_url: url = resource.terminal_url else: - url = request['base_url'] + url = request["base_url"] # 1. extract domain, PID, and version from URL components = urllib.parse.urlparse(url) - platform_domain = components.netloc.split(':')[0].lower() + platform_domain = components.netloc.split(":")[0].lower() params = urllib.parse.parse_qs(components.query) - id_param = params.get('persistentId') + id_param = params.get("persistentId") if not (id_param and id_param[0]): raise PlatformScopeError("Expected a Dataverse persistentId in URL") platform_id = id_param[0] - version_param = params.get('version') + version_param = params.get("version") dataset_version = None if version_param: dataset_version = version_param[0] @@ -170,10 +191,11 @@ class DataverseHelper(FilesetPlatformHelper): except ValueError: raise PlatformScopeError("not actually in scope") - if parsed_id['file_id']: + if parsed_id["file_id"]: # TODO: maybe we could support this? raise PlatformScopeError( - "only entire dataverse datasets can be archived with this tool") + "only entire dataverse datasets can be archived with this tool" + ) # 1b. if we didn't get a version number from URL, fetch it from API if not dataset_version: @@ -182,8 +204,10 @@ class DataverseHelper(FilesetPlatformHelper): ) resp.raise_for_status() obj = resp.json() - obj_latest = obj['data']['latestVersion'] - dataset_version = f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}" + obj_latest = obj["data"]["latestVersion"] + dataset_version = ( + f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}" + ) # 2. API fetch resp = self.session.get( @@ -192,69 +216,72 @@ class DataverseHelper(FilesetPlatformHelper): resp.raise_for_status() obj = resp.json() - obj_latest = obj['data']['latestVersion'] - assert dataset_version == f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}" - assert platform_id == obj_latest['datasetPersistentId'] + obj_latest = obj["data"]["latestVersion"] + assert ( + dataset_version + == f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}" + ) + assert platform_id == obj_latest["datasetPersistentId"] manifest = [] - for row in obj_latest['files']: - df = row['dataFile'] - df_persistent_id = df['persistentId'] + for row in obj_latest["files"]: + df = row["dataFile"] + df_persistent_id = df["persistentId"] platform_url = f"https://{platform_domain}/api/access/datafile/:persistentId/?persistentId={df_persistent_id}" - if df.get('originalFileName'): - platform_url += '&format=original' + if df.get("originalFileName"): + platform_url += "&format=original" extra = dict() # TODO: always save the version field? - if row.get('version') != 1: - extra['version'] = row['version'] - if 'description' in df: - extra['description'] = df['description'] + if row.get("version") != 1: + extra["version"] = row["version"] + if "description" in df: + extra["description"] = df["description"] manifest.append( FilesetManifestFile( - path=df.get('originalFileName') or df['filename'], - size=df.get('originalFileSize') or df['filesize'], - md5=df['md5'], + path=df.get("originalFileName") or df["filename"], + size=df.get("originalFileSize") or df["filesize"], + md5=df["md5"], # NOTE: don't get: sha1, sha256 - mimetype=df['contentType'], + mimetype=df["contentType"], platform_url=platform_url, extra=extra or None, - )) + ) + ) - platform_sub_id = platform_id.split('/')[-1] + platform_sub_id = platform_id.split("/")[-1] archiveorg_item_name = f"{platform_domain}-{platform_sub_id}-v{dataset_version}" archiveorg_item_meta = dict( # TODO: collection=platform_domain, collection="datasets", - date=obj_latest['releaseTime'].split('T')[0], - source= - f"https://{platform_domain}/dataset.xhtml?persistentId={platform_id}&version={dataset_version}", + date=obj_latest["releaseTime"].split("T")[0], + source=f"https://{platform_domain}/dataset.xhtml?persistentId={platform_id}&version={dataset_version}", ) - if platform_id.startswith('doi:10.'): - archiveorg_item_meta['doi'] = platform_id.replace('doi:', '') - for block in obj_latest['metadataBlocks']['citation']['fields']: - if block['typeName'] == 'title': - archiveorg_item_meta['title'] = block['value'] - elif block['typeName'] == 'depositor': - archiveorg_item_meta['creator'] = block['value'] - elif block['typeName'] == 'dsDescription': - archiveorg_item_meta['description'] = block['value'][0]['dsDescriptionValue'][ - 'value'] - - archiveorg_item_meta['description'] = archiveorg_item_meta.get('description', '') - if obj_latest.get('termsOfUse'): - archiveorg_item_meta['description'] += '\n
\n' + obj_latest['termsOfUse'] + if platform_id.startswith("doi:10."): + archiveorg_item_meta["doi"] = platform_id.replace("doi:", "") + for block in obj_latest["metadataBlocks"]["citation"]["fields"]: + if block["typeName"] == "title": + archiveorg_item_meta["title"] = block["value"] + elif block["typeName"] == "depositor": + archiveorg_item_meta["creator"] = block["value"] + elif block["typeName"] == "dsDescription": + archiveorg_item_meta["description"] = block["value"][0]["dsDescriptionValue"][ + "value" + ] + + archiveorg_item_meta["description"] = archiveorg_item_meta.get("description", "") + if obj_latest.get("termsOfUse"): + archiveorg_item_meta["description"] += "\n
\n" + obj_latest["termsOfUse"] return FilesetPlatformItem( platform_name=self.platform_name, - platform_status='success', + platform_status="success", manifest=manifest, platform_domain=platform_domain, platform_id=platform_id, archiveorg_item_name=archiveorg_item_name, archiveorg_item_meta=archiveorg_item_meta, - web_bundle_url= - f"https://{platform_domain}/api/access/dataset/:persistentId/?persistentId={platform_id}&format=original", + web_bundle_url=f"https://{platform_domain}/api/access/dataset/:persistentId/?persistentId={platform_id}&format=original", # TODO: web_base_url= (for GWB downloading, in lieu of platform_url on individual files) extra=dict(version=dataset_version), ) @@ -301,7 +328,7 @@ def test_parse_dataverse_persistentid() -> None: } invalid = [ - #"doi:10.5072/FK2/J8SJZB/LL6WXZ", + # "doi:10.5072/FK2/J8SJZB/LL6WXZ", "doi:10.25625/abcd", "other:10.25625/LL6WXZ", "10.25625/LL6WXZ", @@ -322,7 +349,7 @@ def test_parse_dataverse_persistentid() -> None: class FigshareHelper(FilesetPlatformHelper): def __init__(self): super().__init__() - self.platform_name = 'figshare' + self.platform_name = "figshare" self.session = requests.Session() @staticmethod @@ -337,13 +364,13 @@ class FigshareHelper(FilesetPlatformHelper): # eg: /articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858/1 # /articles/dataset/STable_1_U-Pb_geochronologic_analyses_on_samples_xls/12127176/4 - comp = path.split('/') - if len(comp) < 4 or comp[1] != 'articles': + comp = path.split("/") + if len(comp) < 4 or comp[1] != "articles": raise ValueError(f"not a figshare URL: {path}") comp = comp[2:] if comp[0] in [ - 'dataset', + "dataset", ]: comp = comp[1:] @@ -354,19 +381,23 @@ class FigshareHelper(FilesetPlatformHelper): else: raise ValueError(f"couldn't find figshare identiier: {path}") - def match_request(self, request: dict, resource: Optional[ResourceResult], - html_biblio: Optional[BiblioMetadata]) -> bool: + def match_request( + self, + request: dict, + resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata], + ) -> bool: if resource and resource.terminal_url: url = resource.terminal_url else: - url = request['base_url'] + url = request["base_url"] components = urllib.parse.urlparse(url) - platform_domain = components.netloc.split(':')[0].lower() + platform_domain = components.netloc.split(":")[0].lower() # only work with full, versioned figshare.com URLs - if 'figshare.com' not in platform_domain: + if "figshare.com" not in platform_domain: return False try: @@ -380,8 +411,12 @@ class FigshareHelper(FilesetPlatformHelper): return False - def process_request(self, request: dict, resource: Optional[ResourceResult], - html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: + def process_request( + self, + request: dict, + resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata], + ) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) """ @@ -389,15 +424,16 @@ class FigshareHelper(FilesetPlatformHelper): if resource and resource.terminal_url: url = resource.terminal_url else: - url = request['base_url'] + url = request["base_url"] # 1. extract domain, PID, and version from URL components = urllib.parse.urlparse(url) - platform_domain = components.netloc.split(':')[0].lower() + platform_domain = components.netloc.split(":")[0].lower() (platform_id, dataset_version) = self.parse_figshare_url_path(components.path) assert platform_id.isdigit(), f"expected numeric: {platform_id}" - assert dataset_version and dataset_version.isdigit( + assert ( + dataset_version and dataset_version.isdigit() ), f"expected numeric: {dataset_version}" # 1b. if we didn't get a version number from URL, fetch it from API @@ -405,59 +441,60 @@ class FigshareHelper(FilesetPlatformHelper): # 2. API fetch resp = self.session.get( - f"https://api.figshare.com/v2/articles/{platform_id}/versions/{dataset_version}") + f"https://api.figshare.com/v2/articles/{platform_id}/versions/{dataset_version}" + ) resp.raise_for_status() obj = resp.json() # figshare_type = obj['defined_type_name'] - if not obj['is_public']: - raise PlatformRestrictedError(f'record not public: {platform_id} {dataset_version}') - if obj['is_embargoed']: + if not obj["is_public"]: + raise PlatformRestrictedError(f"record not public: {platform_id} {dataset_version}") + if obj["is_embargoed"]: raise PlatformRestrictedError( f'record is embargoed: {obj.get("embargo_title")} ({platform_id} {dataset_version})' ) manifest = [] - for row in obj['files']: + for row in obj["files"]: manifest.append( FilesetManifestFile( - path=row['name'], - size=row['size'], - md5=row['computed_md5'], + path=row["name"], + size=row["size"], + md5=row["computed_md5"], # NOTE: don't get: sha1, sha256, mimetype - platform_url=row['download_url'], - #extra=dict(), - )) - assert not row.get('is_link_only') + platform_url=row["download_url"], + # extra=dict(), + ) + ) + assert not row.get("is_link_only") authors = [] - for author in obj['authors']: - authors.append(author['full_name']) + for author in obj["authors"]: + authors.append(author["full_name"]) archiveorg_item_name = f"{platform_domain}-{platform_id}-v{dataset_version}" archiveorg_item_meta = dict( # TODO: collection=platform_domain, collection="datasets", creator=authors, - doi=obj['doi'], - title=obj['title'], - date=obj['published_date'], - source=obj['url_public_html'], - description=obj['description'], - license=obj['license']['url'], - version=obj['version'], + doi=obj["doi"], + title=obj["title"], + date=obj["published_date"], + source=obj["url_public_html"], + description=obj["description"], + license=obj["license"]["url"], + version=obj["version"], ) return FilesetPlatformItem( platform_name=self.platform_name, - platform_status='success', + platform_status="success", manifest=manifest, platform_domain=platform_domain, platform_id=platform_id, archiveorg_item_name=archiveorg_item_name, archiveorg_item_meta=archiveorg_item_meta, - web_bundle_url= - f"https://ndownloader.figshare.com/articles/{platform_id}/versions/{dataset_version}", + web_bundle_url=f"https://ndownloader.figshare.com/articles/{platform_id}/versions/{dataset_version}", # TODO: web_base_url= (for GWB downloading, in lieu of platform_url on individual files) extra=dict(version=dataset_version), ) @@ -466,13 +503,19 @@ class FigshareHelper(FilesetPlatformHelper): def test_parse_figshare_url_path() -> None: valid = { - "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858/1": - ("8987858", "1"), - "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858": - ("8987858", None), + "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858/1": ( + "8987858", + "1", + ), + "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858": ( + "8987858", + None, + ), "/articles/CIBERSORT_p-value_0_05/8217188/1": ("8217188", "1"), - "/articles/dataset/STable_1_U-Pb_geochronologic_analyses_on_samples_xls/12127176/4": - ("12127176", "4"), + "/articles/dataset/STable_1_U-Pb_geochronologic_analyses_on_samples_xls/12127176/4": ( + "12127176", + "4", + ), } invalid = [ @@ -493,25 +536,33 @@ def test_parse_figshare_url_path() -> None: class ZenodoHelper(FilesetPlatformHelper): def __init__(self): super().__init__() - self.platform_name = 'zenodo' + self.platform_name = "zenodo" self.session = requests.Session() - def match_request(self, request: dict, resource: Optional[ResourceResult], - html_biblio: Optional[BiblioMetadata]) -> bool: + def match_request( + self, + request: dict, + resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata], + ) -> bool: if resource and resource.terminal_url: url = resource.terminal_url else: - url = request['base_url'] + url = request["base_url"] components = urllib.parse.urlparse(url) - platform_domain = components.netloc.split(':')[0].lower() - if platform_domain == 'zenodo.org' and '/record/' in components.path: + platform_domain = components.netloc.split(":")[0].lower() + if platform_domain == "zenodo.org" and "/record/" in components.path: return True return False - def process_request(self, request: dict, resource: Optional[ResourceResult], - html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: + def process_request( + self, + request: dict, + resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata], + ) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) """ @@ -519,7 +570,7 @@ class ZenodoHelper(FilesetPlatformHelper): if resource and resource.terminal_url: url = resource.terminal_url else: - url = request['base_url'] + url = request["base_url"] # TODO: also look in base_url and resource-non-terminal for ident? to # check for work-level redirects @@ -527,118 +578,118 @@ class ZenodoHelper(FilesetPlatformHelper): # 1. extract identifier from URL # eg: https://zenodo.org/record/5230255 components = urllib.parse.urlparse(url) - platform_domain = components.netloc.split(':')[0].lower() - if len(components.path.split('/')) < 2: + platform_domain = components.netloc.split(":")[0].lower() + if len(components.path.split("/")) < 2: raise PlatformScopeError("Expected a complete, versioned figshare URL") - platform_id = components.path.split('/')[2] + platform_id = components.path.split("/")[2] assert platform_id.isdigit(), f"expected numeric: {platform_id}" - if 'zenodo.org' not in platform_domain: + if "zenodo.org" not in platform_domain: raise PlatformScopeError(f"unexpected zenodo.org domain: {platform_domain}") # 2. API fetch resp = self.session.get(f"https://zenodo.org/api/records/{platform_id}") if resp.status_code == 410: - raise PlatformRestrictedError('record deleted') + raise PlatformRestrictedError("record deleted") resp.raise_for_status() obj = resp.json() - assert obj['id'] == int(platform_id) - work_id = obj['conceptrecid'] - if work_id == obj['id']: + assert obj["id"] == int(platform_id) + work_id = obj["conceptrecid"] + if work_id == obj["id"]: raise PlatformScopeError( - "got a work-level zenodo record, not a versioned record: {work_id}") + "got a work-level zenodo record, not a versioned record: {work_id}" + ) # zenodo_type = obj['metadata']['resource_type']['type'] - if obj['metadata']['access_right'] != 'open': + if obj["metadata"]["access_right"] != "open": raise PlatformRestrictedError( "not publicly available ({obj['metadata']['access_right']}): {platform_domain} {platform_id}" ) manifest = [] - for row in obj['files']: + for row in obj["files"]: mf = FilesetManifestFile( - path=row['key'], - size=row['size'], - platform_url=row['links']['self'], - #extra=dict(), + path=row["key"], + size=row["size"], + platform_url=row["links"]["self"], + # extra=dict(), ) - checksum = row['checksum'] + checksum = row["checksum"] # eg: md5:35ffcab905f8224556dba76648cb7dad - if checksum.startswith('md5:'): + if checksum.startswith("md5:"): mf.md5 = checksum[4:] - elif checksum.startswith('sha1:'): + elif checksum.startswith("sha1:"): mf.sha1 = checksum[45] manifest.append(mf) authors = [] - for author in obj['metadata']['creators']: - authors.append(author['name']) + for author in obj["metadata"]["creators"]: + authors.append(author["name"]) archiveorg_item_name = f"{platform_domain}-{platform_id}" archiveorg_item_meta = dict( # TODO: collection=platform_domain, collection="datasets", creator=authors, - doi=obj['doi'], - title=obj['metadata']['title'], - date=obj['metadata']['publication_date'], - source=obj['links']['html'], - description=obj['metadata']['description'], - license=obj['metadata']['license']['id'], - version=obj['revision'], + doi=obj["doi"], + title=obj["metadata"]["title"], + date=obj["metadata"]["publication_date"], + source=obj["links"]["html"], + description=obj["metadata"]["description"], + license=obj["metadata"]["license"]["id"], + version=obj["revision"], # obj['metadata']['version'] is, eg, git version tag ) return FilesetPlatformItem( platform_name=self.platform_name, - platform_status='success', + platform_status="success", manifest=manifest, platform_domain=platform_domain, platform_id=platform_id, archiveorg_item_name=archiveorg_item_name, archiveorg_item_meta=archiveorg_item_meta, - #web_bundle_url=f"https://ndownloader.figshare.com/articles/{platform_id}/versions/{dataset_version}", + # web_bundle_url=f"https://ndownloader.figshare.com/articles/{platform_id}/versions/{dataset_version}", # TODO: web_base_url= (for GWB downloading, in lieu of platform_url on individual files) - extra=dict(version=obj['revision']), + extra=dict(version=obj["revision"]), ) class ArchiveOrgHelper(FilesetPlatformHelper): FORMAT_TO_MIMETYPE = { - 'BZIP': 'application/x-bzip', - 'BZIP2': 'application/x-bzip2', - 'ZIP': 'application/zip', - 'GZIP': 'application/gzip', - 'RAR': 'application/vnd.rar', - 'TAR': 'application/x-tar', - '7z': 'application/x-7z-compressed', - 'HTML': 'text/html', - 'Text': 'text/plain', - 'PDF': 'application/pdf', - 'CSV': 'text/csv', - 'XML': 'application/xml', - 'JSON': 'application/json', - + "BZIP": "application/x-bzip", + "BZIP2": "application/x-bzip2", + "ZIP": "application/zip", + "GZIP": "application/gzip", + "RAR": "application/vnd.rar", + "TAR": "application/x-tar", + "7z": "application/x-7z-compressed", + "HTML": "text/html", + "Text": "text/plain", + "PDF": "application/pdf", + "CSV": "text/csv", + "XML": "application/xml", + "JSON": "application/json", #'application/msword (.doc)', # .doc #'application/vnd.openxmlformats-officedocument.wordprocessingml.document', # .docx #'application/vnd.ms-excel', # .xls #'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', # .xlsx - 'MP3': 'audio/mpeg', # .mp3 - 'MP4': 'video/mp4', # .mp4 - 'MPEG': 'video/mpeg', # .mpeg - 'JPEG': 'image/jpeg', - 'GIF': 'image/gif', - 'PNG': 'image/png', - 'TIFF': 'image/tiff', - 'Unknown': None, + "MP3": "audio/mpeg", # .mp3 + "MP4": "video/mp4", # .mp4 + "MPEG": "video/mpeg", # .mpeg + "JPEG": "image/jpeg", + "GIF": "image/gif", + "PNG": "image/png", + "TIFF": "image/tiff", + "Unknown": None, } def __init__(self): super().__init__() - self.platform_name = 'archiveorg' + self.platform_name = "archiveorg" self.session = internetarchive.get_session() @staticmethod @@ -646,69 +697,79 @@ class ArchiveOrgHelper(FilesetPlatformHelper): """ Filters IA API files """ - if f.source != 'original': + if f.source != "original": return False for suffix in [ - '_meta.sqlite', - '_archive.torrent', - '_itemimage.jpg', - '_meta.xml', - '_thumb.png', - '_files.xml', + "_meta.sqlite", + "_archive.torrent", + "_itemimage.jpg", + "_meta.xml", + "_thumb.png", + "_files.xml", ]: if f.name == item_name + suffix or f.name == item_name.lower() + suffix: return False - if f.name.startswith('_'): + if f.name.startswith("_"): return False - if item_name.startswith('academictorrents_'): + if item_name.startswith("academictorrents_"): for suffix in [ - '_academictorrents.torrent', '_academictorrents_torrent.txt', '.bib' + "_academictorrents.torrent", + "_academictorrents_torrent.txt", + ".bib", ]: if f.name == item_name + suffix: return False return True - def match_request(self, request: dict, resource: Optional[ResourceResult], - html_biblio: Optional[BiblioMetadata]) -> bool: + def match_request( + self, + request: dict, + resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata], + ) -> bool: if resource and resource.terminal_url: url = resource.terminal_url else: - url = request['base_url'] + url = request["base_url"] patterns = [ - '://archive.org/details/', - '://archive.org/download/', + "://archive.org/details/", + "://archive.org/download/", ] for p in patterns: if p in url: return True return False - def process_request(self, request: dict, resource: Optional[ResourceResult], - html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: + def process_request( + self, + request: dict, + resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata], + ) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) """ - base_url_split = request['base_url'].split('/') - #print(base_url_split, file=sys.stderr) + base_url_split = request["base_url"].split("/") + # print(base_url_split, file=sys.stderr) assert len(base_url_split) in [5, 6] - assert base_url_split[0] in ['http:', 'https:'] - assert base_url_split[2] == 'archive.org' - assert base_url_split[3] in ['details', 'download'] + assert base_url_split[0] in ["http:", "https:"] + assert base_url_split[2] == "archive.org" + assert base_url_split[3] in ["details", "download"] item_name = base_url_split[4] if len(base_url_split) == 6 and base_url_split[5]: raise PlatformScopeError( "got an archive.org file path, not download/details page; individual files not handled yet" ) - #print(f" archiveorg processing item={item_name}", file=sys.stderr) + # print(f" archiveorg processing item={item_name}", file=sys.stderr) item = self.session.get_item(item_name) item_name = item.identifier - item_collection = item.metadata['collection'] + item_collection = item.metadata["collection"] if type(item_collection) == list: item_collection = item_collection[0] - assert item.metadata['mediatype'] not in ['collection', 'web'] + assert item.metadata["mediatype"] not in ["collection", "web"] item_files = item.get_files(on_the_fly=False) item_files = [f for f in item_files if self.want_item_file(f, item_name)] manifest = [] @@ -727,9 +788,9 @@ class ArchiveOrgHelper(FilesetPlatformHelper): return FilesetPlatformItem( platform_name=self.platform_name, - platform_status='success', + platform_status="success", manifest=manifest, - platform_domain='archive.org', + platform_domain="archive.org", platform_id=item_name, archiveorg_item_name=item_name, archiveorg_meta=dict(collection=item_collection), diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py index 9d3bae3..6dc77f9 100644 --- a/python/sandcrawler/fileset_strategies.py +++ b/python/sandcrawler/fileset_strategies.py @@ -5,15 +5,19 @@ from typing import Optional import internetarchive -from sandcrawler.fileset_types import (ArchiveStrategyResult, FilesetPlatformItem, - IngestStrategy, PlatformScopeError) +from sandcrawler.fileset_types import ( + ArchiveStrategyResult, + FilesetPlatformItem, + IngestStrategy, + PlatformScopeError, +) from sandcrawler.ia import SavePageNowClient, WaybackClient, fix_transfer_encoding from sandcrawler.misc import gen_file_metadata, gen_file_metadata_path, sanitize_fs_path -class FilesetIngestStrategy(): +class FilesetIngestStrategy: def __init__(self): - #self.ingest_strategy = 'unknown' + # self.ingest_strategy = 'unknown' self.success_status = "success" def check_existing(self, item: FilesetPlatformItem) -> Optional[ArchiveStrategyResult]: @@ -29,8 +33,8 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): self.ingest_strategy = IngestStrategy.ArchiveorgFileset # TODO: enable cleanup when confident (eg, safe path parsing) - self.skip_cleanup_local_files = kwargs.get('skip_cleanup_local_files', True) - self.working_dir = os.environ.get('SANDCRAWLER_WORKING_DIR', '/tmp/sandcrawler/') + self.skip_cleanup_local_files = kwargs.get("skip_cleanup_local_files", True) + self.working_dir = os.environ.get("SANDCRAWLER_WORKING_DIR", "/tmp/sandcrawler/") try: os.mkdir(self.working_dir) except FileExistsError: @@ -53,23 +57,29 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): found = False for existing in item_files: if existing.name == wanted.path: - if ((existing.sha1 and existing.sha1 == wanted.sha1) or - (existing.md5 and existing.md5 == wanted.md5) - ) and existing.name == wanted.path and existing.size == wanted.size: + if ( + ( + (existing.sha1 and existing.sha1 == wanted.sha1) + or (existing.md5 and existing.md5 == wanted.md5) + ) + and existing.name == wanted.path + and existing.size == wanted.size + ): found = True - wanted.status = 'exists' + wanted.status = "exists" break else: - wanted.status = 'mismatch-existing' + wanted.status = "mismatch-existing" break if not found: print( f" item exists ({item.archiveorg_item_name}) but didn't find at least one file: {wanted.path}", - file=sys.stderr) + file=sys.stderr, + ) return None return ArchiveStrategyResult( ingest_strategy=self.ingest_strategy, - status='success-existing', + status="success-existing", manifest=item.manifest, ) @@ -81,12 +91,12 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): if existing: return existing - if item.platform_name == 'archiveorg': + if item.platform_name == "archiveorg": raise PlatformScopeError("should't download archive.org into itself") local_dir = self.working_dir + item.archiveorg_item_name - assert local_dir.startswith('/') - assert local_dir.count('/') > 2 + assert local_dir.startswith("/") + assert local_dir.count("/") > 2 try: os.mkdir(local_dir) except FileExistsError: @@ -96,71 +106,80 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): assert item.manifest for m in item.manifest: if m.path != sanitize_fs_path(m.path): - m.status = 'unsafe-path' + m.status = "unsafe-path" continue - local_path = local_dir + '/' + m.path + local_path = local_dir + "/" + m.path assert m.platform_url if not os.path.exists(local_path): print(f" downloading {m.path}", file=sys.stderr) - with self.ia_session.get(m.platform_url, stream=True, - allow_redirects=True) as r: + with self.ia_session.get( + m.platform_url, stream=True, allow_redirects=True + ) as r: r.raise_for_status() - with open(local_path + '.partial', 'wb') as f: + with open(local_path + ".partial", "wb") as f: for chunk in r.iter_content(chunk_size=256 * 1024): f.write(chunk) - os.rename(local_path + '.partial', local_path) - m.status = 'downloaded-local' + os.rename(local_path + ".partial", local_path) + m.status = "downloaded-local" else: - m.status = 'exists-local' + m.status = "exists-local" print(f" verifying {m.path}", file=sys.stderr) file_meta = gen_file_metadata_path(local_path, allow_empty=True) - assert file_meta[ - 'size_bytes'] == m.size, f"expected: {m.size} found: {file_meta['size_bytes']}" + assert ( + file_meta["size_bytes"] == m.size + ), f"expected: {m.size} found: {file_meta['size_bytes']}" if m.sha1: - assert file_meta['sha1hex'] == m.sha1 + assert file_meta["sha1hex"] == m.sha1 else: - m.sha1 = file_meta['sha1hex'] + m.sha1 = file_meta["sha1hex"] if m.sha256: - assert file_meta['sha256hex'] == m.sha256 + assert file_meta["sha256hex"] == m.sha256 else: - m.sha256 = file_meta['sha256hex'] + m.sha256 = file_meta["sha256hex"] if m.md5: - assert file_meta['md5hex'] == m.md5 + assert file_meta["md5hex"] == m.md5 else: - m.md5 = file_meta['md5hex'] + m.md5 = file_meta["md5hex"] if m.mimetype: # 'magic' isn't good and parsing more detailed text file formats like text/csv - if file_meta['mimetype'] != m.mimetype and file_meta['mimetype'] != 'text/plain': + if ( + file_meta["mimetype"] != m.mimetype + and file_meta["mimetype"] != "text/plain" + ): # these 'tab-separated-values' from dataverse are just noise, don't log them - if m.mimetype != 'text/tab-separated-values': + if m.mimetype != "text/tab-separated-values": print( f" WARN: mimetype mismatch: expected {m.mimetype}, found {file_meta['mimetype']}", - file=sys.stderr) - m.mimetype = file_meta['mimetype'] + file=sys.stderr, + ) + m.mimetype = file_meta["mimetype"] else: - m.mimetype = file_meta['mimetype'] - m.status = 'verified-local' + m.mimetype = file_meta["mimetype"] + m.status = "verified-local" # 2. upload all files, with metadata - assert item.archiveorg_item_meta and item.archiveorg_item_meta['collection'] + assert item.archiveorg_item_meta and item.archiveorg_item_meta["collection"] item_files = [] for m in item.manifest: - local_path = local_dir + '/' + m.path - item_files.append({ - 'name': local_path, - 'remote_name': m.path, - }) + local_path = local_dir + "/" + m.path + item_files.append( + { + "name": local_path, + "remote_name": m.path, + } + ) print( f" uploading all files to {item.archiveorg_item_name} under {item.archiveorg_item_meta.get('collection')}...", - file=sys.stderr) + file=sys.stderr, + ) internetarchive.upload( item.archiveorg_item_name, files=item_files, @@ -171,7 +190,7 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): ) for m in item.manifest: - m.status = 'success' + m.status = "success" # 4. delete local directory if not self.skip_cleanup_local_files: @@ -191,6 +210,7 @@ class ArchiveorgFileStrategy(ArchiveorgFilesetStrategy): ArchiveorgFilesetStrategy currently works fine with individual files. Just need to over-ride the ingest_strategy name. """ + def __init__(self): super().__init__() self.ingest_strategy = IngestStrategy.ArchiveorgFileset @@ -204,7 +224,8 @@ class WebFilesetStrategy(FilesetIngestStrategy): self.wayback_client = WaybackClient() self.try_spn2 = True self.spn_client = SavePageNowClient( - spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0)) + spn_cdx_retry_sec=kwargs.get("spn_cdx_retry_sec", 9.0) + ) self.max_spn_manifest = 20 def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult: @@ -222,25 +243,31 @@ class WebFilesetStrategy(FilesetIngestStrategy): fetch_url = m.platform_url if not fetch_url: raise NotImplementedError( - "require 'platform_url' for each file when doing Web fetching") + "require 'platform_url' for each file when doing Web fetching" + ) via = "wayback" resource = self.wayback_client.lookup_resource(fetch_url, m.mimetype) - if self.try_spn2 and (resource is None or - (resource and resource.status == 'no-capture')): + if self.try_spn2 and ( + resource is None or (resource and resource.status == "no-capture") + ): if len(item.manifest) > self.max_spn_manifest: - m.status = 'too-much-spn' + m.status = "too-much-spn" continue via = "spn2" - resource = self.spn_client.crawl_resource(fetch_url, - self.wayback_client, - force_simple_get=True) - - print("[FETCH {:>6}] {} {}".format(via, (resource and resource.status), - (resource and resource.terminal_url) - or fetch_url), - file=sys.stderr) + resource = self.spn_client.crawl_resource( + fetch_url, self.wayback_client, force_simple_get=True + ) + + print( + "[FETCH {:>6}] {} {}".format( + via, + (resource and resource.status), + (resource and resource.terminal_url) or fetch_url, + ), + file=sys.stderr, + ) m.terminal_url = resource.terminal_url m.terminal_dt = resource.terminal_dt @@ -248,7 +275,7 @@ class WebFilesetStrategy(FilesetIngestStrategy): if self.ingest_strategy == "web-file": file_resource = resource - if resource.status != 'success': + if resource.status != "success": continue else: assert resource.terminal_status_code == 200 @@ -259,24 +286,26 @@ class WebFilesetStrategy(FilesetIngestStrategy): if self.ingest_strategy == "web-file": file_file_meta = file_meta - if file_meta['size_bytes'] != m.size or (m.md5 and m.md5 != file_meta['md5hex'] - ) or (m.sha1 - and m.sha1 != file_meta['sha1hex']): - m.status = 'mismatch' + if ( + file_meta["size_bytes"] != m.size + or (m.md5 and m.md5 != file_meta["md5hex"]) + or (m.sha1 and m.sha1 != file_meta["sha1hex"]) + ): + m.status = "mismatch" continue - m.md5 = m.md5 or file_meta['md5hex'] - m.sha1 = m.sha1 or file_meta['md5hex'] - m.sha256 = m.sha256 or file_meta['sha256hex'] - m.mimetype = m.mimetype or file_meta['mimetype'] + m.md5 = m.md5 or file_meta["md5hex"] + m.sha1 = m.sha1 or file_meta["md5hex"] + m.sha256 = m.sha256 or file_meta["sha256hex"] + m.mimetype = m.mimetype or file_meta["mimetype"] overall_status = self.success_status for m in item.manifest: - if m.status != 'success': - overall_status = m.status or 'not-processed' + if m.status != "success": + overall_status = m.status or "not-processed" break if not item.manifest: - overall_status = 'empty-manifest' + overall_status = "empty-manifest" result = ArchiveStrategyResult( ingest_strategy=self.ingest_strategy, diff --git a/python/sandcrawler/fileset_types.py b/python/sandcrawler/fileset_types.py index f543ede..3398833 100644 --- a/python/sandcrawler/fileset_types.py +++ b/python/sandcrawler/fileset_types.py @@ -62,6 +62,7 @@ class PlatformScopeError(Exception): - a 'latest version' record, when the platform has version-specific records - a single file within a dataset for a platform which has file-level identifiers """ + pass @@ -69,4 +70,5 @@ class PlatformRestrictedError(Exception): """ When datasets are not publicly available on a platform (yet) """ + pass diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py index 67aca17..26918f6 100644 --- a/python/sandcrawler/grobid.py +++ b/python/sandcrawler/grobid.py @@ -12,11 +12,11 @@ from .workers import SandcrawlerFetchWorker, SandcrawlerWorker class GrobidClient(object): def __init__(self, host_url: str = "http://grobid.qa.fatcat.wiki", **kwargs): self.host_url = host_url - self.consolidate_mode = int(kwargs.get('consolidate_mode', 0)) + self.consolidate_mode = int(kwargs.get("consolidate_mode", 0)) - def process_fulltext(self, - blob: bytes, - consolidate_mode: Optional[int] = None) -> Dict[str, Any]: + def process_fulltext( + self, blob: bytes, consolidate_mode: Optional[int] = None + ) -> Dict[str, Any]: """ Returns dict with keys: - status_code @@ -36,72 +36,75 @@ class GrobidClient(object): grobid_response = requests.post( self.host_url + "/api/processFulltextDocument", files={ - 'input': blob, - 'consolidateHeader': consolidate_mode, - 'consolidateCitations': 0, # too expensive for now - 'includeRawCitations': 1, + "input": blob, + "consolidateHeader": consolidate_mode, + "consolidateCitations": 0, # too expensive for now + "includeRawCitations": 1, }, timeout=180.0, ) except requests.Timeout: return { - 'status': 'error-timeout', - 'status_code': -4, # heritrix3 "HTTP timeout" code - 'error_msg': 'GROBID request (HTTP POST) timeout', + "status": "error-timeout", + "status_code": -4, # heritrix3 "HTTP timeout" code + "error_msg": "GROBID request (HTTP POST) timeout", } info: Dict[str, Any] = dict(status_code=grobid_response.status_code) if grobid_response.status_code == 200: - info['status'] = 'success' - info['tei_xml'] = grobid_response.text - if len(info['tei_xml']) > 12000000: + info["status"] = "success" + info["tei_xml"] = grobid_response.text + if len(info["tei_xml"]) > 12000000: # XML is larger than Kafka message size, and much larger than # an article in general; bail out - info['status'] = 'error' - info['error_msg'] = "response XML too large: {} bytes".format( - len(info['tei_xml'])) - info.pop('tei_xml') + info["status"] = "error" + info["error_msg"] = "response XML too large: {} bytes".format( + len(info["tei_xml"]) + ) + info.pop("tei_xml") else: # response.text is .content decoded as utf-8 - info['status'] = 'error' - info['error_msg'] = grobid_response.text[:10000] + info["status"] = "error" + info["error_msg"] = grobid_response.text[:10000] return info def metadata(self, result: Dict[str, Any]) -> Optional[Dict[str, Any]]: - if result['status'] != 'success': + if result["status"] != "success": return None - tei_json = teixml2json(result['tei_xml'], encumbered=False) + tei_json = teixml2json(result["tei_xml"], encumbered=False) meta = dict() biblio = dict() for k in ( - 'title', - 'authors', - 'journal', - 'date', - 'doi', + "title", + "authors", + "journal", + "date", + "doi", ): if tei_json.get(k): biblio[k] = tei_json[k] - meta['biblio'] = biblio - for k in ('grobid_version', 'grobid_timestamp', 'fatcat_release', 'language_code'): + meta["biblio"] = biblio + for k in ("grobid_version", "grobid_timestamp", "fatcat_release", "language_code"): if tei_json.get(k): meta[k] = tei_json[k] return meta class GrobidWorker(SandcrawlerFetchWorker): - def __init__(self, - grobid_client: GrobidClient, - wayback_client: Optional[WaybackClient] = None, - sink: Optional[SandcrawlerWorker] = None, - **kwargs): + def __init__( + self, + grobid_client: GrobidClient, + wayback_client: Optional[WaybackClient] = None, + sink: Optional[SandcrawlerWorker] = None, + **kwargs + ): super().__init__(wayback_client=wayback_client) self.grobid_client = grobid_client self.sink = sink self.consolidate_mode = 0 def timeout_response(self, task: Any) -> Any: - default_key = task['sha1hex'] + default_key = task["sha1hex"] return dict( status="error-timeout", error_msg="internal GROBID worker timeout", @@ -111,16 +114,17 @@ class GrobidWorker(SandcrawlerFetchWorker): def process(self, record: Any, key: Optional[str] = None) -> Any: fetch_result = self.fetch_blob(record) - if fetch_result['status'] != 'success': + if fetch_result["status"] != "success": return fetch_result - blob: bytes = fetch_result['blob'] + blob: bytes = fetch_result["blob"] assert blob and isinstance(blob, bytes) - result = self.grobid_client.process_fulltext(blob, - consolidate_mode=self.consolidate_mode) - result['file_meta'] = gen_file_metadata(blob) - result['source'] = record - result['key'] = result['file_meta']['sha1hex'] + result = self.grobid_client.process_fulltext( + blob, consolidate_mode=self.consolidate_mode + ) + result["file_meta"] = gen_file_metadata(blob) + result["source"] = record + result["key"] = result["file_meta"]["sha1hex"] return result @@ -129,10 +133,10 @@ class GrobidBlobWorker(SandcrawlerWorker): This is sort of like GrobidWorker, except it receives blobs directly, instead of fetching blobs from some remote store. """ - def __init__(self, - grobid_client: GrobidClient, - sink: Optional[SandcrawlerWorker] = None, - **kwargs): + + def __init__( + self, grobid_client: GrobidClient, sink: Optional[SandcrawlerWorker] = None, **kwargs + ): super().__init__() self.grobid_client = grobid_client self.sink = sink @@ -141,8 +145,9 @@ class GrobidBlobWorker(SandcrawlerWorker): def process(self, blob: Any, key: Optional[str] = None) -> Any: if not blob: return None - result = self.grobid_client.process_fulltext(blob, - consolidate_mode=self.consolidate_mode) - result['file_meta'] = gen_file_metadata(blob) - result['key'] = result['file_meta']['sha1hex'] + result = self.grobid_client.process_fulltext( + blob, consolidate_mode=self.consolidate_mode + ) + result["file_meta"] = gen_file_metadata(blob) + result["key"] = result["file_meta"]["sha1hex"] return result diff --git a/python/sandcrawler/html.py b/python/sandcrawler/html.py index 4d36573..5fba963 100644 --- a/python/sandcrawler/html.py +++ b/python/sandcrawler/html.py @@ -7,7 +7,8 @@ from typing import Dict from bs4 import BeautifulSoup RESEARCHSQUARE_REGEX = re.compile( - r'"url":"(https://assets.researchsquare.com/files/.{1,50}/v\d+/Manuscript.pdf)"') + r'"url":"(https://assets.researchsquare.com/files/.{1,50}/v\d+/Manuscript.pdf)"' +) IEEEXPLORE_REGEX = re.compile(r'"pdfPath":"(/.*?\.pdf)"') OVID_JOURNAL_URL_REGEX = re.compile(r'journalURL = "(http.*)";') SCIENCEDIRECT_BOUNCE_URL_REGEX = re.compile(r"window.location = '(http.*)';") @@ -21,9 +22,9 @@ def extract_fulltext_url(html_url: str, html_body: bytes) -> Dict[str, str]: On error, or if fails to extract a URL, returns an empty dict. """ - host_prefix = '/'.join(html_url.split('/')[:3]) + host_prefix = "/".join(html_url.split("/")[:3]) try: - soup = BeautifulSoup(html_body, 'html.parser') + soup = BeautifulSoup(html_body, "html.parser") except TypeError as te: print(f"{te} (url={html_url})", file=sys.stderr) return dict() @@ -34,80 +35,86 @@ def extract_fulltext_url(html_url: str, html_body: bytes) -> Dict[str, str]: ### General Tricks ### # highwire-style meta tag - meta = soup.find('meta', attrs={"name": "citation_pdf_url"}) + meta = soup.find("meta", attrs={"name": "citation_pdf_url"}) if not meta: - meta = soup.find('meta', attrs={"name": "bepress_citation_pdf_url"}) + meta = soup.find("meta", attrs={"name": "bepress_citation_pdf_url"}) if not meta: - meta = soup.find('meta', attrs={"name": "wkhealth_pdf_url"}) + meta = soup.find("meta", attrs={"name": "wkhealth_pdf_url"}) if not meta: # researchgate does this; maybe others also? - meta = soup.find('meta', attrs={"property": "citation_pdf_url"}) + meta = soup.find("meta", attrs={"property": "citation_pdf_url"}) if not meta: - meta = soup.find('meta', attrs={"name": "eprints.document_url"}) + meta = soup.find("meta", attrs={"name": "eprints.document_url"}) # if tag is only partially populated - if meta and not meta.get('content'): + if meta and not meta.get("content"): meta = None # wiley has a weird almost-blank page we don't want to loop on if meta and "://onlinelibrary.wiley.com/doi/pdf/" not in html_url: - url = meta['content'].strip() - if '://doi.org/' in url: + url = meta["content"].strip() + if "://doi.org/" in url: print(f"\tdoi.org in citation_pdf_url (loop?): {url}", file=sys.stderr) - elif url.startswith('/'): + elif url.startswith("/"): if host_prefix + url == html_url: print("\tavoiding citation_pdf_url link-loop", file=sys.stderr) else: - return dict(pdf_url=host_prefix + url, technique='citation_pdf_url') - elif url.startswith('http'): + return dict(pdf_url=host_prefix + url, technique="citation_pdf_url") + elif url.startswith("http"): if url == html_url: print("\tavoiding citation_pdf_url link-loop", file=sys.stderr) else: - return dict(pdf_url=url, technique='citation_pdf_url') + return dict(pdf_url=url, technique="citation_pdf_url") else: print("\tmalformed citation_pdf_url? {}".format(url), file=sys.stderr) - meta = soup.find('meta', attrs={"name": "generator"}) + meta = soup.find("meta", attrs={"name": "generator"}) meta_generator = None - if meta and meta.get('content'): - meta_generator = meta['content'].strip() + if meta and meta.get("content"): + meta_generator = meta["content"].strip() ### Publisher/Platform Specific ### # research square (researchsquare.com) - if 'researchsquare.com/article/' in html_url: + if "researchsquare.com/article/" in html_url: # JSON in body with a field like: # "url":"https://assets.researchsquare.com/files/4a57970e-b002-4608-b507-b95967649483/v2/Manuscript.pdf" - m = RESEARCHSQUARE_REGEX.search(html_body.decode('utf-8')) + m = RESEARCHSQUARE_REGEX.search(html_body.decode("utf-8")) if m: url = m.group(1) assert len(url) < 4096 - return dict(release_stage="manuscript", pdf_url=url, technique='publisher') + return dict(release_stage="manuscript", pdf_url=url, technique="publisher") # elseiver linking hub # https://linkinghub.elsevier.com/retrieve/pii/S1569199319308975 - if '://linkinghub.elsevier.com/retrieve/pii/' in html_url: + if "://linkinghub.elsevier.com/retrieve/pii/" in html_url: # redirect = soup.find("input", attrs={"name": "redirectURL"}) if redirect: - url = redirect['value'].strip() - if 'http' in url: + url = redirect["value"].strip() + if "http" in url: url = urllib.parse.unquote(url) # drop any the query parameter - url = url.split('?via')[0] + url = url.split("?via")[0] return dict(next_url=url, technique="elsevier-linkinghub") # sciencedirect PDF URL extract # https://www.sciencedirect.com/science/article/pii/S0169204621000670 - if 'sciencedirect.com/science/article/pii/' in html_url and not html_url.endswith(".pdf"): + if "sciencedirect.com/science/article/pii/" in html_url and not html_url.endswith(".pdf"): json_tag = soup.find("script", attrs={"type": "application/json", "data-iso-key": "_0"}) url = None if json_tag: try: json_text = json_tag.string json_meta = json.loads(json_text) - pdf_meta = json_meta['article']['pdfDownload']['urlMetadata'] + pdf_meta = json_meta["article"]["pdfDownload"]["urlMetadata"] # https://www.sciencedirect.com/science/article/pii/S0169204621000670/pdfft?md5=c4a83d06b334b627ded74cf9423bfa56&pid=1-s2.0-S0169204621000670-main.pdf - url = html_url + pdf_meta['pdfExtension'] + "?md5=" + pdf_meta['queryParams'][ - 'md5'] + "&pid=" + pdf_meta['queryParams']['pid'] + url = ( + html_url + + pdf_meta["pdfExtension"] + + "?md5=" + + pdf_meta["queryParams"]["md5"] + + "&pid=" + + pdf_meta["queryParams"]["pid"] + ) except (KeyError, TypeError, json.JSONDecodeError): pass if url: @@ -115,9 +122,9 @@ def extract_fulltext_url(html_url: str, html_body: bytes) -> Dict[str, str]: # sciencedirect PDF bounce page # https://www.sciencedirect.com/science/article/pii/S2590109519300424/pdfft?md5=854f43a44de186eb58674b8e20631691&pid=1-s2.0-S2590109519300424-main.pdf - if '://www.sciencedirect.com/' in html_url and html_url.endswith(".pdf"): + if "://www.sciencedirect.com/" in html_url and html_url.endswith(".pdf"): # window.location = 'https://pdf.sciencedirectassets.com/320270/AIP/1-s2.0-S2590109519300424/main.pdf?X-Amz-Security-Token=[...]&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20200110T210936Z&X-Amz-SignedHeaders=host&X-Amz-Expires=300&X-Amz-Credential=ASIAQ3PHCVTY23CMDBNC%2F20200110%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Signature=[...]&hash=[...]&host=[...]&pii=S2590109519300424&tid=spdf-74468ebd-6be6-43ac-b294-ced86e8eea58&sid=[...]&type=client'; - m = SCIENCEDIRECT_BOUNCE_URL_REGEX.search(html_body.decode('utf-8')) + m = SCIENCEDIRECT_BOUNCE_URL_REGEX.search(html_body.decode("utf-8")) if m: url = m.group(1) assert len(url) < 4000 @@ -125,34 +132,34 @@ def extract_fulltext_url(html_url: str, html_body: bytes) -> Dict[str, str]: # ieeexplore.ieee.org # https://ieeexplore.ieee.org/document/8730316 - if '://ieeexplore.ieee.org/document/' in html_url: + if "://ieeexplore.ieee.org/document/" in html_url: # JSON in body with a field like: # "pdfPath":"/iel7/6287639/8600701/08730316.pdf", - m = IEEEXPLORE_REGEX.search(html_body.decode('utf-8')) + m = IEEEXPLORE_REGEX.search(html_body.decode("utf-8")) if m: url = m.group(1) assert len(url) < 4096 - return dict(release_stage="published", - pdf_url=host_prefix + url, - technique="ieeexplore") + return dict( + release_stage="published", pdf_url=host_prefix + url, technique="ieeexplore" + ) # https://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=8730313 - if '://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber' in html_url: + if "://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber" in html_url: # HTML iframe like: # iframe = soup.find("iframe") - if iframe and '.pdf' in iframe['src']: - return dict(pdf_url=iframe['src'], technique="iframe") + if iframe and ".pdf" in iframe["src"]: + return dict(pdf_url=iframe["src"], technique="iframe") # https://insights.ovid.com/crossref?an=00042307-202001000-00013 # Ovid is some kind of landing page bounce portal tracking run-around. # Can extract actual journal URL from javascript blob in the HTML - if '://insights.ovid.com/crossref' in html_url: + if "://insights.ovid.com/crossref" in html_url: # var journalURL = "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689"; - m = OVID_JOURNAL_URL_REGEX.search(html_body.decode('utf-8')) + m = OVID_JOURNAL_URL_REGEX.search(html_body.decode("utf-8")) if m: url = m.group(1) assert len(url) < 4096 - return dict(next_url=url, technique='ovid') + return dict(next_url=url, technique="ovid") # osf.io # https://osf.io/8phvx/ @@ -160,41 +167,44 @@ def extract_fulltext_url(html_url: str, html_body: bytes) -> Dict[str, str]: # wow, they ship total javascript crud! going to just guess download URL # based on URL for now. Maybe content type header would help? OSF_DOMAINS = [ - '://osf.io/', - '://biohackrxiv.org/', - '://psyarxiv.com/', - '://arabixiv.org/', - '://engrxiv.org/', - '://edarxiv.org//', - '://ecsarxiv.org/', - '://ecoevorxiv.org/', - '://frenxiv.org/', - '://indiarxiv.org/', - '://mindrxiv.org/', - '://mediarxiv.org/', - '://paleorxiv.org/', - '://thesiscommons.org/', + "://osf.io/", + "://biohackrxiv.org/", + "://psyarxiv.com/", + "://arabixiv.org/", + "://engrxiv.org/", + "://edarxiv.org//", + "://ecsarxiv.org/", + "://ecoevorxiv.org/", + "://frenxiv.org/", + "://indiarxiv.org/", + "://mindrxiv.org/", + "://mediarxiv.org/", + "://paleorxiv.org/", + "://thesiscommons.org/", ] for domain in OSF_DOMAINS: - if domain in html_url and (len(html_url.split('/')) in [4, 5] or '/preprints/' - in html_url) and '/download' not in html_url: + if ( + domain in html_url + and (len(html_url.split("/")) in [4, 5] or "/preprints/" in html_url) + and "/download" not in html_url + ): if not html_url.endswith("/"): next_url = html_url + "/download" else: next_url = html_url + "download" - return dict(next_url=next_url, technique='osf-by-url') + return dict(next_url=next_url, technique="osf-by-url") # wiley # https://onlinelibrary.wiley.com/doi/pdf/10.1111/1467-923X.12787 if "://onlinelibrary.wiley.com/doi/pdf/" in html_url: if b"/doi/pdfdirect/" in html_body: - next_url = html_url.replace('/doi/pdf/', '/doi/pdfdirect/') - return dict(next_url=next_url, technique='wiley-pdfdirect') + next_url = html_url.replace("/doi/pdf/", "/doi/pdfdirect/") + return dict(next_url=next_url, technique="wiley-pdfdirect") # arxiv abstract pages if "://arxiv.org/abs/" in html_url: url = html_url.replace("/abs/", "/pdf/") - return dict(pdf_url=url, technique='arxiv-url') + return dict(pdf_url=url, technique="arxiv-url") # american archivist (OA) # https://americanarchivist.org/doi/abs/10.17723/aarc.62.2.j475270470145630 @@ -202,28 +212,28 @@ def extract_fulltext_url(html_url: str, html_body: bytes) -> Dict[str, str]: # use a more aggressive direct guess to avoid rate-limiting... if "/doi/10." in html_url: url = html_url.replace("/doi/10.", "/doi/pdf/10.") - return dict(pdf_url=url, technique='archivist-url') + return dict(pdf_url=url, technique="archivist-url") # - hrefs = soup.find_all('a', attrs={"target": "_blank"}) + hrefs = soup.find_all("a", attrs={"target": "_blank"}) for href in hrefs: - url = href['href'].strip() + url = href["href"].strip() if "/doi/pdf/" in url: - if url.startswith('http'): - return dict(pdf_url=url, technique='publisher-href') - elif url.startswith('/'): - return dict(pdf_url=host_prefix + url, technique='publisher-href') + if url.startswith("http"): + return dict(pdf_url=url, technique="publisher-href") + elif url.startswith("/"): + return dict(pdf_url=host_prefix + url, technique="publisher-href") # protocols.io # https://www.protocols.io/view/flow-cytometry-protocol-mgdc3s6 if "://www.protocols.io/view/" in html_url and not html_url.endswith(".pdf"): url = html_url + ".pdf" - return dict(pdf_url=url, technique='protocolsio-url') + return dict(pdf_url=url, technique="protocolsio-url") # degruyter.com # https://www.degruyter.com/view/books/9783486594621/9783486594621-009/9783486594621-009.xml if "://www.degruyter.com/view/" in html_url and html_url.endswith(".xml"): - url = html_url.replace('/view/', '/downloadpdf/').replace('.xml', '.pdf') - return dict(pdf_url=url, technique='degruyter-url') + url = html_url.replace("/view/", "/downloadpdf/").replace(".xml", ".pdf") + return dict(pdf_url=url, technique="degruyter-url") # journals.lww.com (Wolters Kluwer) # https://journals.lww.com/spinejournal/Abstract/publishahead/Making_the_Most_of_Systematic_Reviews_and.94318.aspx @@ -231,31 +241,32 @@ def extract_fulltext_url(html_url: str, html_body: bytes) -> Dict[str, str]: # we never get the content. if "://journals.lww.com/" in html_url and False: # data-pdf-url="https://pdfs.journals.lww.com/spinejournal/9000/00000/Making_the_Most_of_Systematic_Reviews_and.94318.pdf?token=method|ExpireAbsolute;source|Journals;ttl|1582413672903;payload|mY8D3u1TCCsNvP5E421JYK6N6XICDamxByyYpaNzk7FKjTaa1Yz22MivkHZqjGP4kdS2v0J76WGAnHACH69s21Csk0OpQi3YbjEMdSoz2UhVybFqQxA7lKwSUlA502zQZr96TQRwhVlocEp/sJ586aVbcBFlltKNKo+tbuMfL73hiPqJliudqs17cHeLcLbV/CqjlP3IO0jGHlHQtJWcICDdAyGJMnpi6RlbEJaRheGeh5z5uvqz3FLHgPKVXJzdiVgCTnUeUQFYzcJRFhNtc2gv+ECZGji7HUicj1/6h85Y07DBRl1x2MGqlHWXUawD;hash|6cqYBa15ZK407m4VhFfJLw==" - for line in html_body.split(b'\n'): + for line in html_body.split(b"\n"): if b"data-pdf-url=" in line: - line = line.decode('utf-8') - url = line.strip().replace('data-pdf-url=', '').replace('"', '') - if url.startswith('http') and 'pdfs.journals.lww.com' in url: - return dict(pdf_url=url, technique='journals.lww.com-jsvar') + line = line.decode("utf-8") + url = line.strip().replace("data-pdf-url=", "").replace('"', "") + if url.startswith("http") and "pdfs.journals.lww.com" in url: + return dict(pdf_url=url, technique="journals.lww.com-jsvar") # www.ahajournals.org # https://www.ahajournals.org/doi/10.1161/circ.110.19.2977 - if "://www.ahajournals.org/doi/" in html_url and '/doi/pdf/' not in html_url: + if "://www.ahajournals.org/doi/" in html_url and "/doi/pdf/" not in html_url: # PDF download - if b'/doi/pdf/10.' in html_body: - url = html_url.replace('/doi/10.', '/doi/pdf/10.') + if b"/doi/pdf/10." in html_body: + url = html_url.replace("/doi/10.", "/doi/pdf/10.") url = url + "?download=true" - return dict(pdf_url=url, technique='ahajournals-url') + return dict(pdf_url=url, technique="ahajournals-url") # ehp.niehs.nih.gov # https://ehp.niehs.nih.gov/doi/full/10.1289/EHP4709 # https://ehp.niehs.nih.gov/doi/10.1289/ehp.113-a51 if "://ehp.niehs.nih.gov/doi/" in html_url: # - if b'/doi/pdf/10.' in html_body: - url = html_url.replace('/doi/full/10.', - '/doi/pdf/10.').replace('/doi/10.', '/doi/pdf/10.') - return dict(pdf_url=url, technique='ehp.niehs.nigh.gov-url') + if b"/doi/pdf/10." in html_body: + url = html_url.replace("/doi/full/10.", "/doi/pdf/10.").replace( + "/doi/10.", "/doi/pdf/10." + ) + return dict(pdf_url=url, technique="ehp.niehs.nigh.gov-url") # cogentoa.com # https://www.cogentoa.com/article/10.1080/23311975.2017.1412873 @@ -263,75 +274,75 @@ def extract_fulltext_url(html_url: str, html_body: bytes) -> Dict[str, str]: # blech, it's a SPA! All JS # https://www.cogentoa.com/article/10.1080/23311975.2017.1412873.pdf url = html_url + ".pdf" - return dict(pdf_url=url, technique='cogentoa-url') + return dict(pdf_url=url, technique="cogentoa-url") # chemrxiv.org (likely to be other figshare domains also) # https://chemrxiv.org/articles/Biradical_Formation_by_Deprotonation_in_Thiazole-Derivatives_The_Hidden_Nature_of_Dasatinib/10101419 - if "://chemrxiv.org/articles/" in html_url or '.figshare.org/articles/' in html_url: + if "://chemrxiv.org/articles/" in html_url or ".figshare.org/articles/" in html_url: # - json_tag = soup.find('script', id="app-data", attrs={"type": "text/json"}) + json_tag = soup.find("script", id="app-data", attrs={"type": "text/json"}) if json_tag and json_tag.string: app_data = json.loads(json_tag.string) # "exportPdfDownloadUrl": "https://s3-eu-west-1.amazonaws.com/itempdf74155353254prod/10101419/Biradical_Formation_by_Deprotonation_in_Thiazole-Derivatives__The_Hidden_Nature_of_Dasatinib_v1.pdf" - url = app_data.get('article', {}).get('exportPdfDownloadUrl') - if url and url.startswith('http'): - return dict(pdf_url=url, technique='figshare-json') + url = app_data.get("article", {}).get("exportPdfDownloadUrl") + if url and url.startswith("http"): + return dict(pdf_url=url, technique="figshare-json") # CNKI COVID-19 landing pages # http://en.gzbd.cnki.net/gzbt/detail/detail.aspx?FileName=HBGF202002003&DbName=GZBJ7920&DbCode=GZBJ - if '://en.gzbd.cnki.net/KCMS/detail/detail.aspx' in html_url: + if "://en.gzbd.cnki.net/KCMS/detail/detail.aspx" in html_url: # PDF Download - href = soup.find('a', attrs={"id": "pdfDown"}) + href = soup.find("a", attrs={"id": "pdfDown"}) if href: - url = href['href'].strip().replace(' ', '') - if not url.startswith('http'): + url = href["href"].strip().replace(" ", "") + if not url.startswith("http"): url = host_prefix + url - return dict(pdf_url=url, technique='cnki-href') + return dict(pdf_url=url, technique="cnki-href") # RWTH AACHEN repository - if '://publications.rwth-aachen.de/record/' in html_url: - record_id = html_url.split('/')[-1] + if "://publications.rwth-aachen.de/record/" in html_url: + record_id = html_url.split("/")[-1] url = f"{html_url}/files/{record_id}.pdf" - if record_id.isdigit() and url.encode('utf-8') in html_body: - return dict(pdf_url=url, technique='rwth-aachen-url') + if record_id.isdigit() and url.encode("utf-8") in html_body: + return dict(pdf_url=url, technique="rwth-aachen-url") # physchemaspects.ru - if '://physchemaspects.ru/' in html_url and soup: - for href in soup.find_all('a'): + if "://physchemaspects.ru/" in html_url and soup: + for href in soup.find_all("a"): if href.text == "download PDF file": - url = href['href'] - if url.startswith('/'): + url = href["href"] + if url.startswith("/"): url = host_prefix + url - return dict(pdf_url=url, technique='physchemaspects-href') + return dict(pdf_url=url, technique="physchemaspects-href") # OJS 3 (some) if meta_generator and meta_generator.startswith("Open Journal Systems"): - href = soup.find('a', attrs={"class": "obj_galley_link file"}) + href = soup.find("a", attrs={"class": "obj_galley_link file"}) if href and href.text and "pdf" in href.text.lower(): - url = href['href'].strip() - if url.startswith('/'): + url = href["href"].strip() + if url.startswith("/"): url = host_prefix + url - return dict(pdf_url=url, technique='ojs-galley-href') + return dict(pdf_url=url, technique="ojs-galley-href") # ETH zurich e-periodica - if '://www.e-periodica.ch/digbib/view' in html_url: - url = html_url.replace('digbib/view', 'cntmng').split('#')[0] - if url.encode('utf-8') in html_body: - return dict(pdf_url=url, technique='href-eperiodica') + if "://www.e-periodica.ch/digbib/view" in html_url: + url = html_url.replace("digbib/view", "cntmng").split("#")[0] + if url.encode("utf-8") in html_body: + return dict(pdf_url=url, technique="href-eperiodica") # JMIR # https://mhealth.jmir.org/2020/7/e17891/ - if '.jmir.org/' in html_url and "/pdf" not in html_url and html_url.endswith("/"): + if ".jmir.org/" in html_url and "/pdf" not in html_url and html_url.endswith("/"): url = html_url + "pdf" - return dict(pdf_url=url, technique='jmir-url') + return dict(pdf_url=url, technique="jmir-url") ### below here we are doing guesses # generic guess: try current URL plus .pdf, if it exists in the HTML body - if '.pdf' not in html_url: + if ".pdf" not in html_url: url = html_url + ".pdf" - if url.encode('utf-8') in html_body: - return dict(pdf_url=url, technique='guess-url-plus-pdf') + if url.encode("utf-8") in html_body: + return dict(pdf_url=url, technique="guess-url-plus-pdf") return dict() @@ -343,8 +354,10 @@ def test_regex() -> None: asdf""" m = OVID_JOURNAL_URL_REGEX.search(lines) assert m - assert m.group( - 1) == "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689" + assert ( + m.group(1) + == "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689" + ) lines = """ window.onload = function () { diff --git a/python/sandcrawler/html_metadata.py b/python/sandcrawler/html_metadata.py index e2e673f..1ab667c 100644 --- a/python/sandcrawler/html_metadata.py +++ b/python/sandcrawler/html_metadata.py @@ -30,7 +30,9 @@ HEAD_META_PATTERNS: Dict[str, List[str]] = { "meta[name='dcterms.title']", "meta[name='dc.title']", ], - "subtitle": ["meta[name='prism.subtitle']", ], + "subtitle": [ + "meta[name='prism.subtitle']", + ], "doi": [ "meta[name='citation_doi']", "meta[name='DOI']", @@ -40,7 +42,9 @@ HEAD_META_PATTERNS: Dict[str, List[str]] = { "meta[name='dc.identifier.doi']", "meta[name='dc.identifier'][scheme='doi']", ], - "pmid": ["meta[name='citation_pmid']", ], + "pmid": [ + "meta[name='citation_pmid']", + ], "abstract": [ "meta[name='citation_abstract']", "meta[name='bepress_citation_abstract']", @@ -61,7 +65,9 @@ HEAD_META_PATTERNS: Dict[str, List[str]] = { "meta[name='dc.source']", "meta[property='og:site_name']", ], - "container_abbrev": ["meta[name='citation_journal_abbrev']", ], + "container_abbrev": [ + "meta[name='citation_journal_abbrev']", + ], "raw_date": [ "meta[name='citation_publication_date']", "meta[name='bepress_citation_publication_date']", @@ -162,7 +168,9 @@ HEAD_META_LIST_PATTERNS: Dict[str, List[str]] = { "meta[name='dc.contributor']", ], # TODO: citation_author_institution - "raw_references": ["meta[name='citation_reference']", ], + "raw_references": [ + "meta[name='citation_reference']", + ], "raw_identifiers": [ "meta[name='eprints.id_number']", "meta[name='dcterms.identifier']", @@ -646,8 +654,9 @@ class BiblioMetadata(pydantic.BaseModel): json_encoders = {datetime.date: lambda dt: dt.isoformat()} -def html_extract_fulltext_url(doc_url: str, doc: HTMLParser, - patterns: List[dict]) -> Optional[Tuple[str, str]]: +def html_extract_fulltext_url( + doc_url: str, doc: HTMLParser, patterns: List[dict] +) -> Optional[Tuple[str, str]]: """ Tries to quickly extract fulltext URLs using a set of patterns. This function is intendend to be generic across various extraction techniques. @@ -656,36 +665,36 @@ def html_extract_fulltext_url(doc_url: str, doc: HTMLParser, """ self_doc_url: Optional[Tuple[str, str]] = None for pattern in patterns: - if 'selector' not in pattern: + if "selector" not in pattern: continue - if 'in_doc_url' in pattern: - if pattern['in_doc_url'] not in doc_url: + if "in_doc_url" in pattern: + if pattern["in_doc_url"] not in doc_url: continue - elem = doc.css_first(pattern['selector']) + elem = doc.css_first(pattern["selector"]) if not elem: continue val = None - if 'attr' in pattern: - val = elem.attrs.get(pattern['attr']) - elif pattern.get('use_body'): + if "attr" in pattern: + val = elem.attrs.get(pattern["attr"]) + elif pattern.get("use_body"): val = elem.text() - if '://' not in val: + if "://" not in val: continue if not val: continue val = urllib.parse.urljoin(doc_url, val) assert val - if 'in_fulltext_url' in pattern: - if pattern['in_fulltext_url'] not in val: + if "in_fulltext_url" in pattern: + if pattern["in_fulltext_url"] not in val: continue for skip_pattern in FULLTEXT_URL_PATTERNS_SKIP: if skip_pattern in val.lower(): continue if url_fuzzy_equal(doc_url, val): # don't link to self, unless no other options - self_doc_url = (val, pattern.get('technique', 'unknown')) + self_doc_url = (val, pattern.get("technique", "unknown")) continue - return (val, pattern.get('technique', 'unknown')) + return (val, pattern.get("technique", "unknown")) if self_doc_url: print(" WARN: returning fulltext URL pointing to self", file=sys.stderr) return self_doc_url @@ -703,9 +712,9 @@ def html_extract_biblio(doc_url: str, doc: HTMLParser) -> Optional[BiblioMetadat for field, patterns in HEAD_META_PATTERNS.items(): for pattern in patterns: val = head.css_first(pattern) - #print((field, pattern, val)) - if val and 'content' in val.attrs and val.attrs['content']: - meta[field] = val.attrs['content'] + # print((field, pattern, val)) + if val and "content" in val.attrs and val.attrs["content"]: + meta[field] = val.attrs["content"] break for field, patterns in HEAD_META_LIST_PATTERNS.items(): @@ -713,53 +722,53 @@ def html_extract_biblio(doc_url: str, doc: HTMLParser) -> Optional[BiblioMetadat val_list = head.css(pattern) if val_list: for val in val_list: - if 'content' in val.attrs and val.attrs['content']: + if "content" in val.attrs and val.attrs["content"]: if field not in meta: meta[field] = [] - meta[field].append(val.attrs['content']) + meta[field].append(val.attrs["content"]) break # (some) fulltext extractions pdf_fulltext_url = html_extract_fulltext_url(doc_url, doc, PDF_FULLTEXT_PATTERNS) if pdf_fulltext_url: - meta['pdf_fulltext_url'] = pdf_fulltext_url[0] + meta["pdf_fulltext_url"] = pdf_fulltext_url[0] xml_fulltext_url = html_extract_fulltext_url(doc_url, doc, XML_FULLTEXT_PATTERNS) if xml_fulltext_url: - meta['xml_fulltext_url'] = xml_fulltext_url[0] + meta["xml_fulltext_url"] = xml_fulltext_url[0] html_fulltext_url = html_extract_fulltext_url(doc_url, doc, HTML_FULLTEXT_PATTERNS) if html_fulltext_url: - meta['html_fulltext_url'] = html_fulltext_url[0] + meta["html_fulltext_url"] = html_fulltext_url[0] component_url = html_extract_fulltext_url(doc_url, doc, COMPONENT_FULLTEXT_PATTERNS) if component_url: - meta['component_url'] = component_url[0] + meta["component_url"] = component_url[0] # TODO: replace with clean_doi() et al - if meta.get('doi') and meta.get('doi').startswith('doi:'): - meta['doi'] = meta['doi'][4:] + if meta.get("doi") and meta.get("doi").startswith("doi:"): + meta["doi"] = meta["doi"][4:] - raw_identifiers = meta.pop('raw_identifiers', []) + raw_identifiers = meta.pop("raw_identifiers", []) for ident in raw_identifiers: - if ident.startswith('doi:10.'): - if 'doi' not in meta: - meta['doi'] = ident.replace('doi:', '') - elif ident.startswith('10.') and '/' in ident: - if 'doi' not in meta: - meta['doi'] = ident - elif ident.startswith('isbn:'): - if 'isbn' not in meta: - meta['isbn'] = ident.replace('isbn:', '') - - raw_date = meta.pop('raw_date', None) + if ident.startswith("doi:10."): + if "doi" not in meta: + meta["doi"] = ident.replace("doi:", "") + elif ident.startswith("10.") and "/" in ident: + if "doi" not in meta: + meta["doi"] = ident + elif ident.startswith("isbn:"): + if "isbn" not in meta: + meta["isbn"] = ident.replace("isbn:", "") + + raw_date = meta.pop("raw_date", None) if raw_date: parsed = dateparser.parse(raw_date) if parsed: - meta['release_date'] = parsed.date() + meta["release_date"] = parsed.date() - raw_release_type = meta.pop('raw_release_type', None) + raw_release_type = meta.pop("raw_release_type", None) if raw_release_type: release_type = RELEASE_TYPE_MAP.get(raw_release_type.lower().strip()) if release_type: - meta['release_type'] = release_type + meta["release_type"] = release_type return BiblioMetadata(**meta) @@ -786,29 +795,26 @@ def load_adblock_rules() -> braveblock.Adblocker: "||pbs.twimg.com^", "||badge.dimensions.ai^", "||recaptcha.net^", - # not sure about these CC badges (usually via a redirect) - #"||licensebuttons.net^", - #"||i.creativecommons.org^", - + # "||licensebuttons.net^", + # "||i.creativecommons.org^", # Should we skip jquery, or other generic javascript CDNs? - #"||code.jquery.com^", - #"||ajax.googleapis.com^", - #"||cdnjs.cloudflare.com^", - + # "||code.jquery.com^", + # "||ajax.googleapis.com^", + # "||cdnjs.cloudflare.com^", # badges, "share" buttons, tracking, etc "apis.google.com/js/plusone", "www.google.com/recaptcha/", "js/_getUACode.js" - # PLOS images "/resource/img/icon.*.16.png^", ], ) -def _extract_generic(doc: HTMLParser, selector: str, attrs: List[str], - type_name: str) -> List[Dict[str, str]]: +def _extract_generic( + doc: HTMLParser, selector: str, attrs: List[str], type_name: str +) -> List[Dict[str, str]]: resources = [] for node in doc.css(selector): @@ -818,21 +824,22 @@ def _extract_generic(doc: HTMLParser, selector: str, attrs: List[str], url = node.attrs.get(attr) # special-case a couple meta URI prefixes which don't match with adblock rules skip = False - for prefix in ['about:', 'data:', 'magnet:', 'urn:', 'mailto:']: + for prefix in ["about:", "data:", "magnet:", "urn:", "mailto:"]: if url and url.startswith(prefix): skip = True break if skip: continue if url: - #print(url, file=sys.stderr) + # print(url, file=sys.stderr) resources.append(dict(url=url.strip(), type=type_name)) return resources -def html_extract_resources(doc_url: str, doc: HTMLParser, - adblock: braveblock.Adblocker) -> List[Dict[str, str]]: +def html_extract_resources( + doc_url: str, doc: HTMLParser, adblock: braveblock.Adblocker +) -> List[Dict[str, str]]: """ This function tries to find all the important resources in a page. The presumption is that the HTML document is article fulltext, and we want the @@ -860,12 +867,14 @@ def html_extract_resources(doc_url: str, doc: HTMLParser, # ensure URLs are absolute for r in resources: - r['url'] = urllib.parse.urljoin(doc_url, r['url']) + r["url"] = urllib.parse.urljoin(doc_url, r["url"]) # filter using adblocker resources = [ - r for r in resources if adblock.check_network_urls( - r['url'], source_url=doc_url, request_type=r['type']) is False + r + for r in resources + if adblock.check_network_urls(r["url"], source_url=doc_url, request_type=r["type"]) + is False ] # remove duplicates diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py index 8f28d42..99a7f36 100644 --- a/python/sandcrawler/ia.py +++ b/python/sandcrawler/ia.py @@ -34,50 +34,63 @@ class SandcrawlerBackoffError(Exception): be passed up through any timeout/retry code and become an actual long pause or crash. """ + pass -ResourceResult = namedtuple("ResourceResult", [ - "start_url", - "hit", - "status", - "terminal_url", - "terminal_dt", - "terminal_status_code", - "body", - "cdx", - "revisit_cdx", -]) - -WarcResource = namedtuple("WarcResource", [ - "status_code", - "location", - "body", - "revisit_cdx", -]) - -CdxRow = namedtuple('CdxRow', [ - 'surt', - 'datetime', - 'url', - 'mimetype', - 'status_code', - 'sha1b32', - 'sha1hex', - 'warc_csize', - 'warc_offset', - 'warc_path', -]) - -CdxPartial = namedtuple('CdxPartial', [ - 'surt', - 'datetime', - 'url', - 'mimetype', - 'status_code', - 'sha1b32', - 'sha1hex', -]) +ResourceResult = namedtuple( + "ResourceResult", + [ + "start_url", + "hit", + "status", + "terminal_url", + "terminal_dt", + "terminal_status_code", + "body", + "cdx", + "revisit_cdx", + ], +) + +WarcResource = namedtuple( + "WarcResource", + [ + "status_code", + "location", + "body", + "revisit_cdx", + ], +) + +CdxRow = namedtuple( + "CdxRow", + [ + "surt", + "datetime", + "url", + "mimetype", + "status_code", + "sha1b32", + "sha1hex", + "warc_csize", + "warc_offset", + "warc_path", + ], +) + +CdxPartial = namedtuple( + "CdxPartial", + [ + "surt", + "datetime", + "url", + "mimetype", + "status_code", + "sha1b32", + "sha1hex", + ], +) def cdx_partial_from_row(row: Union[CdxRow, CdxPartial]) -> CdxPartial: @@ -102,10 +115,10 @@ def cdx_to_dict(cdx: Union[CdxRow, CdxPartial]) -> Dict[str, Any]: "sha1b32": cdx.sha1b32, "sha1hex": cdx.sha1hex, } - if type(cdx) == CdxRow and '/' in cdx.warc_path: - d['warc_csize'] = cdx.warc_csize - d['warc_offset'] = cdx.warc_offset - d['warc_path'] = cdx.warc_path + if type(cdx) == CdxRow and "/" in cdx.warc_path: + d["warc_csize"] = cdx.warc_csize + d["warc_offset"] = cdx.warc_offset + d["warc_path"] = cdx.warc_path return d @@ -116,9 +129,9 @@ def fuzzy_match_url(left: str, right: str) -> bool: """ if left == right: return True - if '://' in left and '://' in right: - left = '://'.join(left.split('://')[1:]) - right = '://'.join(right.split('://')[1:]) + if "://" in left and "://" in right: + left = "://".join(left.split("://")[1:]) + right = "://".join(right.split("://")[1:]) if left == right: return True if left == right + "/" or right == left + "/": @@ -149,14 +162,17 @@ class CdxApiClient: def __init__(self, host_url: str = "https://web.archive.org/cdx/search/cdx", **kwargs): self.host_url = host_url self.http_session = requests_retry_session(retries=3, backoff_factor=3) - cdx_auth_token = kwargs.get('cdx_auth_token', os.environ.get('CDX_AUTH_TOKEN')) + cdx_auth_token = kwargs.get("cdx_auth_token", os.environ.get("CDX_AUTH_TOKEN")) if not cdx_auth_token: raise Exception( - "CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)") - self.http_session.headers.update({ - 'User-Agent': 'Mozilla/5.0 sandcrawler.CdxApiClient', - 'Cookie': 'cdx_auth_token={}'.format(cdx_auth_token), - }) + "CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)" + ) + self.http_session.headers.update( + { + "User-Agent": "Mozilla/5.0 sandcrawler.CdxApiClient", + "Cookie": "cdx_auth_token={}".format(cdx_auth_token), + } + ) def _query_api(self, params: Dict[str, str]) -> Optional[List[CdxRow]]: """ @@ -165,7 +181,7 @@ class CdxApiClient: resp = self.http_session.get(self.host_url, params=params) if resp.status_code != 200: raise CdxApiError(resp.text) - #print(resp.url, file=sys.stderr) + # print(resp.url, file=sys.stderr) if not resp.text: return None rj = resp.json() @@ -187,7 +203,7 @@ class CdxApiClient: status_code = int(raw[4]) # CDX rows with no WARC records? - if raw[8] == '-' or raw[9] == '-' or raw[10] == '-': + if raw[8] == "-" or raw[9] == "-" or raw[10] == "-": continue row = CdxRow( @@ -206,28 +222,31 @@ class CdxApiClient: rows.append(row) return rows - def fetch(self, - url: str, - datetime: str, - filter_status_code: Optional[int] = None, - retry_sleep: Optional[int] = None) -> CdxRow: + def fetch( + self, + url: str, + datetime: str, + filter_status_code: Optional[int] = None, + retry_sleep: Optional[int] = None, + ) -> CdxRow: """ Fetches a single CDX row by url/datetime. Raises a KeyError if not found, because we expect to be looking up a specific full record. """ if len(datetime) != 14: raise ValueError( - "CDX fetch requires full 14 digit timestamp. Got: {}".format(datetime)) + "CDX fetch requires full 14 digit timestamp. Got: {}".format(datetime) + ) params: Dict[str, str] = { - 'url': url, - 'from': datetime, - 'to': datetime, - 'matchType': 'exact', - 'limit': "1", - 'output': 'json', + "url": url, + "from": datetime, + "to": datetime, + "matchType": "exact", + "limit": "1", + "output": "json", } if filter_status_code: - params['filter'] = "statuscode:{}".format(filter_status_code) + params["filter"] = "statuscode:{}".format(filter_status_code) resp = self._query_api(params) if not resp: if retry_sleep and retry_sleep > 0: @@ -235,37 +254,43 @@ class CdxApiClient: if retry_sleep > 3: next_sleep = retry_sleep - 3 retry_sleep = 3 - print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), - file=sys.stderr) + print( + " CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), + file=sys.stderr, + ) time.sleep(retry_sleep) - return self.fetch(url, - datetime, - filter_status_code=filter_status_code, - retry_sleep=next_sleep) + return self.fetch( + url, datetime, filter_status_code=filter_status_code, retry_sleep=next_sleep + ) raise KeyError("CDX url/datetime not found: {} {}".format(url, datetime)) row = resp[0] # allow fuzzy http/https match if not (fuzzy_match_url(row.url, url) and row.datetime == datetime): if retry_sleep and retry_sleep > 0: - print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), - file=sys.stderr) + print( + " CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), + file=sys.stderr, + ) time.sleep(retry_sleep) - return self.fetch(url, - datetime, - filter_status_code=filter_status_code, - retry_sleep=None) + return self.fetch( + url, datetime, filter_status_code=filter_status_code, retry_sleep=None + ) raise KeyError( "Didn't get exact CDX url/datetime match. url:{} dt:{} got:{}".format( - url, datetime, row)) + url, datetime, row + ) + ) if filter_status_code: assert row.status_code == filter_status_code return row - def lookup_best(self, - url: str, - max_age_days: Optional[int] = None, - best_mimetype: Optional[str] = None, - closest: Union[datetime.datetime, str, None] = None) -> Optional[CdxRow]: + def lookup_best( + self, + url: str, + max_age_days: Optional[int] = None, + best_mimetype: Optional[str] = None, + closest: Union[datetime.datetime, str, None] = None, + ) -> Optional[CdxRow]: """ Fetches multiple CDX rows for the given URL, tries to find the most recent. @@ -289,27 +314,26 @@ class CdxApiClient: """ params: Dict[str, str] = { - 'url': url, - 'matchType': 'exact', - 'limit': "-25", - 'output': 'json', + "url": url, + "matchType": "exact", + "limit": "-25", + "output": "json", # Collapsing seems efficient, but is complex; would need to include # other filters and status code in filter #'collapse': 'timestamp:6', - # Revisits now allowed and resolved! #'filter': '!mimetype:warc/revisit', } if max_age_days: since = datetime.date.today() - datetime.timedelta(days=max_age_days) - params['from'] = '%04d%02d%02d' % (since.year, since.month, since.day) + params["from"] = "%04d%02d%02d" % (since.year, since.month, since.day) if closest: if isinstance(closest, datetime.datetime): - params['closest'] = '%04d%02d%02d' % (closest.year, closest.month, closest.day) + params["closest"] = "%04d%02d%02d" % (closest.year, closest.month, closest.day) else: - params['closest'] = closest - params['sort'] = "closest" - #print(params, file=sys.stderr) + params["closest"] = closest + params["sort"] = "closest" + # print(params, file=sys.stderr) rows = self._query_api(params) if not rows: return None @@ -326,7 +350,7 @@ class CdxApiClient: int(r.mimetype == best_mimetype), int(r.mimetype != "warc/revisit"), int(r.datetime[:6]), - int('/' in r.warc_path), + int("/" in r.warc_path), int(r.datetime), ) @@ -358,25 +382,23 @@ class WaybackClient: self.cdx_client = CdxApiClient() # /serve/ instead of /download/ doesn't record view count # this *does* want to be http://, not https:// - self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/') + self.petabox_base_url = kwargs.get("petabox_base_url", "http://archive.org/serve/") # gwb library will fall back to reading from /opt/.petabox/webdata.secret self.petabox_webdata_secret = kwargs.get( - 'petabox_webdata_secret', - os.environ.get('PETABOX_WEBDATA_SECRET'), + "petabox_webdata_secret", + os.environ.get("PETABOX_WEBDATA_SECRET"), ) - self.warc_uri_prefix = kwargs.get('warc_uri_prefix', 'https://archive.org/serve/') + self.warc_uri_prefix = kwargs.get("warc_uri_prefix", "https://archive.org/serve/") self.rstore = None self.max_redirects = 25 self.wayback_endpoint = "https://web.archive.org/web/" self.replay_headers = { - 'User-Agent': 'Mozilla/5.0 sandcrawler.WaybackClient', + "User-Agent": "Mozilla/5.0 sandcrawler.WaybackClient", } - def fetch_petabox(self, - csize: int, - offset: int, - warc_path: str, - resolve_revisit: bool = True) -> WarcResource: + def fetch_petabox( + self, csize: int, offset: int, warc_path: str, resolve_revisit: bool = True + ) -> WarcResource: """ Fetches wayback resource directly from petabox using WARC path/offset/csize. @@ -401,37 +423,49 @@ class WaybackClient: raise Exception("WaybackClient needs petabox secret to do direct WARC fetches") if "/" not in warc_path: raise ValueError( - "what looks like a liveweb/SPN temporary warc path: {}".format(warc_path)) + "what looks like a liveweb/SPN temporary warc path: {}".format(warc_path) + ) warc_uri = self.warc_uri_prefix + warc_path if not self.rstore: self.rstore = ResourceStore( - loaderfactory=CDXLoaderFactory3(webdata_secret=self.petabox_webdata_secret, )) + loaderfactory=CDXLoaderFactory3( + webdata_secret=self.petabox_webdata_secret, + ) + ) assert self.rstore try: - #print("offset: {} csize: {} uri: {}".format(offset, csize, warc_uri), file=sys.stderr) + # print("offset: {} csize: {} uri: {}".format(offset, csize, warc_uri), file=sys.stderr) gwb_record = self.rstore.load_resource(warc_uri, offset, csize) except wayback.exception.ResourceUnavailable: print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr) raise PetaboxError( - "failed to load file contents from wayback/petabox (ResourceUnavailable)") + "failed to load file contents from wayback/petabox (ResourceUnavailable)" + ) except wayback.exception.InvalidResource: print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr) raise WaybackContentError( - "failed to load file contents from wayback/petabox (InvalidResource)") + "failed to load file contents from wayback/petabox (InvalidResource)" + ) except urllib3.exceptions.ReadTimeoutError as rte: raise PetaboxError( - "failed to load file contents from wayback/petabox (ReadTimeoutError: {})". - format(rte)) + "failed to load file contents from wayback/petabox (ReadTimeoutError: {})".format( + rte + ) + ) except ValueError as ve: raise PetaboxError( - "failed to load file contents from wayback/petabox (ValueError: {})".format(ve)) + "failed to load file contents from wayback/petabox (ValueError: {})".format(ve) + ) except EOFError as eofe: raise PetaboxError( - "failed to load file contents from wayback/petabox (EOFError: {})".format(eofe)) + "failed to load file contents from wayback/petabox (EOFError: {})".format(eofe) + ) except TypeError as te: raise PetaboxError( - "failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)" - .format(te)) + "failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format( + te + ) + ) except Exception as e: if "while decompressing data: invalid block type" in str(e): raise PetaboxError( @@ -449,8 +483,11 @@ class WaybackClient: raise WaybackContentError("too many HTTP headers (in wayback fetch)") location = gwb_record.get_location() or None - if status_code is None and gwb_record.target_uri.startswith( - b"ftp://") and not gwb_record.is_revisit(): + if ( + status_code is None + and gwb_record.target_uri.startswith(b"ftp://") + and not gwb_record.is_revisit() + ): # TODO: some additional verification here? status_code = 226 @@ -463,17 +500,19 @@ class WaybackClient: if not (revisit_uri and revisit_dt): raise WaybackContentError( "revisit record missing URI and/or DT: warc:{} offset:{}".format( - warc_path, offset)) + warc_path, offset + ) + ) # convert revisit_dt # len("2018-07-24T11:56:49"), or with "Z" assert len(revisit_dt) in (19, 20) if type(revisit_uri) is bytes: - revisit_uri = revisit_uri.decode('utf-8') + revisit_uri = revisit_uri.decode("utf-8") if type(revisit_dt) is bytes: - revisit_dt = revisit_dt.decode('utf-8') - revisit_dt = revisit_dt.replace('-', '').replace(':', - '').replace('T', - '').replace('Z', '') + revisit_dt = revisit_dt.decode("utf-8") + revisit_dt = ( + revisit_dt.replace("-", "").replace(":", "").replace("T", "").replace("Z", "") + ) assert len(revisit_dt) == 14 try: revisit_cdx = self.cdx_client.fetch(revisit_uri, revisit_dt) @@ -491,8 +530,10 @@ class WaybackClient: body = gwb_record.open_raw_content().read() except IncompleteRead as ire: raise WaybackError( - "failed to read actual file contents from wayback/petabox (IncompleteRead: {})" - .format(ire)) + "failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format( + ire + ) + ) elif status_code is None: raise WaybackContentError("got a None status_code in (W)ARC record") return WarcResource( @@ -502,12 +543,14 @@ class WaybackClient: revisit_cdx=revisit_cdx, ) - def fetch_petabox_body(self, - csize: int, - offset: int, - warc_path: str, - resolve_revisit: bool = True, - expected_status_code: Optional[int] = None) -> bytes: + def fetch_petabox_body( + self, + csize: int, + offset: int, + warc_path: str, + resolve_revisit: bool = True, + expected_status_code: Optional[int] = None, + ) -> bytes: """ Fetches HTTP 200 WARC resource directly from petabox using WARC path/offset/csize. @@ -524,20 +567,22 @@ class WaybackClient: if expected_status_code: if expected_status_code != resource.status_code: - raise KeyError("archived HTTP response (WARC) was not {}: {}".format( - expected_status_code, - resource.status_code, - )) + raise KeyError( + "archived HTTP response (WARC) was not {}: {}".format( + expected_status_code, + resource.status_code, + ) + ) elif resource.status_code not in (200, 226): - raise KeyError("archived HTTP response (WARC) was not 200: {}".format( - resource.status_code)) + raise KeyError( + "archived HTTP response (WARC) was not 200: {}".format(resource.status_code) + ) return resource.body - def fetch_replay_body(self, - url: str, - datetime: str, - cdx_sha1hex: Optional[str] = None) -> bytes: + def fetch_replay_body( + self, url: str, datetime: str, cdx_sha1hex: Optional[str] = None + ) -> bytes: """ Fetches an HTTP 200 record from wayback via the replay interface (web.archive.org) instead of petabox. @@ -570,32 +615,42 @@ class WaybackClient: except UnicodeDecodeError: raise WaybackContentError( "UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format( - url)) + url + ) + ) try: resp.raise_for_status() except Exception as e: raise WaybackError(str(e)) - #print(resp.url, file=sys.stderr) + # print(resp.url, file=sys.stderr) # defensively check that this is actually correct replay based on headers if "X-Archive-Src" not in resp.headers: raise WaybackError("replay fetch didn't return X-Archive-Src in headers") if datetime not in resp.url: - raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format( - datetime, resp.url)) + raise WaybackError( + "didn't get exact reply (redirect?) datetime:{} got:{}".format( + datetime, resp.url + ) + ) if cdx_sha1hex: # verify that body matches CDX hash # TODO: don't need *all* these hashes, just sha1 file_meta = gen_file_metadata(resp.content) - if cdx_sha1hex != file_meta['sha1hex']: - print(" REPLAY MISMATCH: cdx:{} replay:{}".format(cdx_sha1hex, - file_meta['sha1hex']), - file=sys.stderr) + if cdx_sha1hex != file_meta["sha1hex"]: + print( + " REPLAY MISMATCH: cdx:{} replay:{}".format( + cdx_sha1hex, file_meta["sha1hex"] + ), + file=sys.stderr, + ) raise WaybackContentError( "replay fetch body didn't match CDX hash cdx:{} body:{}".format( - cdx_sha1hex, file_meta['sha1hex']), ) + cdx_sha1hex, file_meta["sha1hex"] + ), + ) return resp.content def fetch_replay_redirect(self, url: str, datetime: str) -> Optional[str]: @@ -625,37 +680,44 @@ class WaybackClient: except UnicodeDecodeError: raise WaybackContentError( "UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format( - url)) + url + ) + ) try: resp.raise_for_status() except Exception as e: raise WaybackError(str(e)) - #print(resp.url, file=sys.stderr) + # print(resp.url, file=sys.stderr) # defensively check that this is actually correct replay based on headers # previously check for "X-Archive-Redirect-Reason" here if "X-Archive-Src" not in resp.headers: raise WaybackError("redirect replay fetch didn't return X-Archive-Src in headers") if datetime not in resp.url: - raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format( - datetime, resp.url)) + raise WaybackError( + "didn't get exact reply (redirect?) datetime:{} got:{}".format( + datetime, resp.url + ) + ) redirect_url = resp.headers.get("Location") # eg, https://web.archive.org/web/20200111003923id_/https://dx.doi.org/10.17504/protocols.io.y2gfybw - #print(redirect_url, file=sys.stderr) + # print(redirect_url, file=sys.stderr) if redirect_url and redirect_url.startswith("https://web.archive.org/web/"): redirect_url = "/".join(redirect_url.split("/")[5:]) - #print(redirect_url, file=sys.stderr) + # print(redirect_url, file=sys.stderr) if redirect_url and redirect_url.startswith("http"): redirect_url = clean_url(redirect_url) return redirect_url else: return None - def lookup_resource(self, - start_url: str, - best_mimetype: Optional[str] = None, - closest: Union[str, datetime.datetime, None] = None) -> ResourceResult: + def lookup_resource( + self, + start_url: str, + best_mimetype: Optional[str] = None, + closest: Union[str, datetime.datetime, None] = None, + ) -> ResourceResult: """ Looks in wayback for a resource starting at the URL, following any redirects. Returns a ResourceResult object, which may indicate a @@ -684,8 +746,9 @@ class WaybackClient: for i in range(self.max_redirects + 1): print(" URL: {}".format(next_url), file=sys.stderr) next_row: Optional[CdxRow] = self.cdx_client.lookup_best( - next_url, best_mimetype=best_mimetype, closest=closest) - #print(next_row, file=sys.stderr) + next_url, best_mimetype=best_mimetype, closest=closest + ) + # print(next_row, file=sys.stderr) if not next_row: return ResourceResult( start_url=start_url, @@ -702,7 +765,7 @@ class WaybackClient: cdx_row: CdxRow = next_row # first try straight-forward redirect situation - if cdx_row.mimetype == "warc/revisit" and '/' in cdx_row.warc_path: + if cdx_row.mimetype == "warc/revisit" and "/" in cdx_row.warc_path: resource = self.fetch_petabox( csize=cdx_row.warc_csize, offset=cdx_row.warc_offset, @@ -725,7 +788,7 @@ class WaybackClient: if cdx_row.status_code in (200, 226): revisit_cdx = None final_cdx: Union[CdxRow, CdxPartial] = cdx_row - if '/' in cdx_row.warc_path: + if "/" in cdx_row.warc_path: resource = self.fetch_petabox( csize=cdx_row.warc_csize, offset=cdx_row.warc_offset, @@ -751,7 +814,7 @@ class WaybackClient: revisit_cdx=revisit_cdx, ) elif 300 <= (cdx_row.status_code or 0) < 400: - if '/' in cdx_row.warc_path: + if "/" in cdx_row.warc_path: resource = self.fetch_petabox( csize=cdx_row.warc_csize, offset=cdx_row.warc_offset, @@ -848,34 +911,39 @@ class SavePageNowBackoffError(SandcrawlerBackoffError): pass -SavePageNowResult = namedtuple('SavePageNowResult', [ - 'success', - 'status', - 'job_id', - 'request_url', - 'terminal_url', - 'terminal_dt', - 'resources', -]) +SavePageNowResult = namedtuple( + "SavePageNowResult", + [ + "success", + "status", + "job_id", + "request_url", + "terminal_url", + "terminal_dt", + "resources", + ], +) class SavePageNowClient: def __init__(self, v2endpoint: str = "https://web.archive.org/save", **kwargs): - self.ia_access_key = kwargs.get('ia_access_key', os.environ.get('IA_ACCESS_KEY')) - self.ia_secret_key = kwargs.get('ia_secret_key', os.environ.get('IA_SECRET_KEY')) + self.ia_access_key = kwargs.get("ia_access_key", os.environ.get("IA_ACCESS_KEY")) + self.ia_secret_key = kwargs.get("ia_secret_key", os.environ.get("IA_SECRET_KEY")) self.v2endpoint = v2endpoint self.v2_session = requests_retry_session(retries=5, backoff_factor=3) - self.v2_session.headers.update({ - 'User-Agent': 'Mozilla/5.0 sandcrawler.SavePageNowClient', - 'Accept': 'application/json', - 'Authorization': 'LOW {}:{}'.format(self.ia_access_key, self.ia_secret_key), - }) + self.v2_session.headers.update( + { + "User-Agent": "Mozilla/5.0 sandcrawler.SavePageNowClient", + "Accept": "application/json", + "Authorization": "LOW {}:{}".format(self.ia_access_key, self.ia_secret_key), + } + ) # 3 minutes total self.poll_count = 60 self.poll_seconds = 3.0 - self.spn_cdx_retry_sec = kwargs.get('spn_cdx_retry_sec', 9.0) + self.spn_cdx_retry_sec = kwargs.get("spn_cdx_retry_sec", 9.0) # these are special-case web domains for which we want SPN2 to not run # a headless browser (brozzler), but instead simply run wget. @@ -888,20 +956,20 @@ class SavePageNowClient: "://europepmc.org/backend/ptpmcrender.fcgi", "://pdfs.semanticscholar.org/", "://res.mdpi.com/", - # platform sites "://zenodo.org/", "://figshare.org/", "://springernature.figshare.com/", - # popular simple cloud storage or direct links "://s3-eu-west-1.amazonaws.com/", ] - def save_url_now_v2(self, - request_url: str, - force_simple_get: Optional[int] = None, - capture_outlinks: int = 0) -> SavePageNowResult: + def save_url_now_v2( + self, + request_url: str, + force_simple_get: Optional[int] = None, + capture_outlinks: int = 0, + ) -> SavePageNowResult: """ Returns a "SavePageNowResult" (namedtuple) if SPN request was processed at all, or raises an exception if there was an error with SPN itself. @@ -944,33 +1012,39 @@ class SavePageNowClient: resp = self.v2_session.post( self.v2endpoint, data={ - 'url': request_url, - 'capture_all': 1, - 'capture_outlinks': capture_outlinks, - 'capture_screenshot': 0, - 'if_not_archived_within': '1d', - 'force_get': force_simple_get, - 'skip_first_archive': 1, - 'outlinks_availability': 0, - 'js_behavior_timeout': 0, + "url": request_url, + "capture_all": 1, + "capture_outlinks": capture_outlinks, + "capture_screenshot": 0, + "if_not_archived_within": "1d", + "force_get": force_simple_get, + "skip_first_archive": 1, + "outlinks_availability": 0, + "js_behavior_timeout": 0, }, ) if resp.status_code == 429: - raise SavePageNowBackoffError("status_code: {}, url: {}".format( - resp.status_code, request_url)) + raise SavePageNowBackoffError( + "status_code: {}, url: {}".format(resp.status_code, request_url) + ) elif resp.status_code != 200: - raise SavePageNowError("SPN2 status_code: {}, url: {}".format( - resp.status_code, request_url)) + raise SavePageNowError( + "SPN2 status_code: {}, url: {}".format(resp.status_code, request_url) + ) resp_json = resp.json() - if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json[ - 'message']: - raise SavePageNowBackoffError(resp_json['message']) - elif not resp_json or 'job_id' not in resp_json or not resp_json['job_id']: + if ( + resp_json + and "message" in resp_json + and "You have already reached the limit of active sessions" in resp_json["message"] + ): + raise SavePageNowBackoffError(resp_json["message"]) + elif not resp_json or "job_id" not in resp_json or not resp_json["job_id"]: raise SavePageNowError( - "Didn't get expected 'job_id' field in SPN2 response: {}".format(resp_json)) + "Didn't get expected 'job_id' field in SPN2 response: {}".format(resp_json) + ) - job_id = resp_json['job_id'] + job_id = resp_json["job_id"] print(f" SPNv2 running: job_id={job_id} url={request_url}", file=sys.stderr) # poll until complete @@ -981,53 +1055,59 @@ class SavePageNowClient: resp.raise_for_status() except Exception: raise SavePageNowError(resp.content) - status = resp.json()['status'] - if status == 'pending': + status = resp.json()["status"] + if status == "pending": time.sleep(self.poll_seconds) - elif status in ('success', 'error'): + elif status in ("success", "error"): final_json = resp.json() break else: - raise SavePageNowError("Unknown SPN2 status:{} url:{}".format( - status, request_url)) + raise SavePageNowError( + "Unknown SPN2 status:{} url:{}".format(status, request_url) + ) if not final_json: raise SavePageNowError("SPN2 timed out (polling count exceeded)") # if there was a recent crawl of same URL, fetch the status of that # crawl to get correct datetime - if final_json.get('original_job_id'): - print(f" SPN recent capture: {job_id} -> {final_json['original_job_id']}", - file=sys.stderr) - resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint, - final_json['original_job_id'])) + if final_json.get("original_job_id"): + print( + f" SPN recent capture: {job_id} -> {final_json['original_job_id']}", + file=sys.stderr, + ) + resp = self.v2_session.get( + "{}/status/{}".format(self.v2endpoint, final_json["original_job_id"]) + ) try: resp.raise_for_status() except Exception: raise SavePageNowError(resp.content) final_json = resp.json() - #print(final_json, file=sys.stderr) + # print(final_json, file=sys.stderr) - if final_json['status'] == "success": - if final_json.get('original_url').startswith('/'): - print(f" truncateded URL in JSON: {request_url} {json.dumps(final_json)}", - file=sys.stderr) + if final_json["status"] == "success": + if final_json.get("original_url").startswith("/"): + print( + f" truncateded URL in JSON: {request_url} {json.dumps(final_json)}", + file=sys.stderr, + ) return SavePageNowResult( True, "success", job_id, request_url, - final_json['original_url'], - final_json['timestamp'], - final_json['resources'], + final_json["original_url"], + final_json["timestamp"], + final_json["resources"], ) else: - if final_json['status'] == 'pending': - final_json['status'] = 'error:pending' + if final_json["status"] == "pending": + final_json["status"] = "error:pending" return SavePageNowResult( False, - final_json.get('status_ext') or final_json['status'], + final_json.get("status_ext") or final_json["status"], job_id, request_url, None, @@ -1035,10 +1115,12 @@ class SavePageNowClient: None, ) - def crawl_resource(self, - start_url: str, - wayback_client: WaybackClient, - force_simple_get: Optional[int] = None) -> ResourceResult: + def crawl_resource( + self, + start_url: str, + wayback_client: WaybackClient, + force_simple_get: Optional[int] = None, + ) -> ResourceResult: """ Runs a SPN2 crawl, then fetches body. @@ -1048,18 +1130,23 @@ class SavePageNowClient: """ # HACK: capture CNKI domains with outlinks (for COVID-19 crawling) - if 'gzbd.cnki.net/' in start_url: - spn_result = self.save_url_now_v2(start_url, - force_simple_get=force_simple_get, - capture_outlinks=1) + if "gzbd.cnki.net/" in start_url: + spn_result = self.save_url_now_v2( + start_url, force_simple_get=force_simple_get, capture_outlinks=1 + ) else: spn_result = self.save_url_now_v2(start_url, force_simple_get=force_simple_get) if not spn_result.success: status = spn_result.status - if status in ("error:invalid-url", "error:not-found", - "error:invalid-host-resolution", "error:gateway-timeout", - "error:too-many-redirects", "error:read-timeout"): + if status in ( + "error:invalid-url", + "error:not-found", + "error:invalid-host-resolution", + "error:gateway-timeout", + "error:too-many-redirects", + "error:read-timeout", + ): status = status.replace("error:", "") elif status in ("error:no-access", "error:forbidden"): status = "forbidden" @@ -1070,8 +1157,10 @@ class SavePageNowClient: elif status.startswith("error:"): status = "spn2-" + status # despite other errors, call these a failure (so we don't retry) - if spn_result.terminal_url and (spn_result.terminal_url.endswith('/cookieAbsent') - or spn_result.terminal_url.endswith("cookieSet=1")): + if spn_result.terminal_url and ( + spn_result.terminal_url.endswith("/cookieAbsent") + or spn_result.terminal_url.endswith("cookieSet=1") + ): status = "blocked-cookie" return ResourceResult( start_url=start_url, @@ -1084,10 +1173,10 @@ class SavePageNowClient: cdx=None, revisit_cdx=None, ) - #print(spn_result, file=sys.stderr) + # print(spn_result, file=sys.stderr) # detect partial URL response (aka, success, but missing full URL) - if "://" not in spn_result.terminal_url or spn_result.terminal_url.startswith('/'): + if "://" not in spn_result.terminal_url or spn_result.terminal_url.startswith("/"): return ResourceResult( start_url=start_url, hit=False, @@ -1102,7 +1191,8 @@ class SavePageNowClient: # don't try to CDX fetch for this common cookie block terminal if spn_result.terminal_url.endswith( - '/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1"): + "/cookieAbsent" + ) or spn_result.terminal_url.endswith("cookieSet=1"): return ResourceResult( start_url=start_url, hit=False, @@ -1127,7 +1217,7 @@ class SavePageNowClient: cdx_row = elsevier_pdf_cdx else: print(" Failed pdf.sciencedirectassets.com hack!", file=sys.stderr) - #print(elsevier_pdf_cdx, file=sys.stderr) + # print(elsevier_pdf_cdx, file=sys.stderr) if not cdx_row: # lookup exact @@ -1164,11 +1254,11 @@ class SavePageNowClient: revisit_cdx=None, ) - #print(cdx_row, file=sys.stderr) + # print(cdx_row, file=sys.stderr) revisit_cdx = None final_cdx: Union[CdxRow, CdxPartial] = cdx_row - if '/' in cdx_row.warc_path: + if "/" in cdx_row.warc_path: # Usually can't do this kind of direct fetch because CDX result is recent/live resource = wayback_client.fetch_petabox( csize=cdx_row.warc_csize, @@ -1228,12 +1318,19 @@ class SavePageNowClient: ) -def fix_transfer_encoding(file_meta: dict, - resource: ResourceResult) -> Tuple[dict, ResourceResult]: - if resource.body and file_meta[ - 'mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip': - print(" transfer encoding not stripped: {}".format(resource.cdx.mimetype), - file=sys.stderr) +def fix_transfer_encoding( + file_meta: dict, resource: ResourceResult +) -> Tuple[dict, ResourceResult]: + if ( + resource.body + and file_meta["mimetype"] == "application/gzip" + and resource.cdx + and resource.cdx.mimetype != "application/gzip" + ): + print( + " transfer encoding not stripped: {}".format(resource.cdx.mimetype), + file=sys.stderr, + ) inner_body = gzip.decompress(resource.body) if not inner_body: raise Exception("null body inside transfer encoding") diff --git a/python/sandcrawler/ingest_file.py b/python/sandcrawler/ingest_file.py index 49c7ddf..4a5abbe 100644 --- a/python/sandcrawler/ingest_file.py +++ b/python/sandcrawler/ingest_file.py @@ -10,15 +10,32 @@ from selectolax.parser import HTMLParser from sandcrawler.db import SandcrawlerPostgrestClient from sandcrawler.grobid import GrobidClient from sandcrawler.html import extract_fulltext_url -from sandcrawler.html_metadata import (html_extract_biblio, html_extract_resources, - load_adblock_rules) -from sandcrawler.ia import (CdxApiError, NoCaptureError, PetaboxError, ResourceResult, - SavePageNowClient, SavePageNowError, WaybackClient, - WaybackContentError, WaybackError, cdx_to_dict, - fix_transfer_encoding) -from sandcrawler.ingest_html import (WebResource, fetch_html_resources, - html_extract_body_teixml, html_guess_platform, - html_guess_scope, quick_fetch_html_resources) +from sandcrawler.html_metadata import ( + html_extract_biblio, + html_extract_resources, + load_adblock_rules, +) +from sandcrawler.ia import ( + CdxApiError, + NoCaptureError, + PetaboxError, + ResourceResult, + SavePageNowClient, + SavePageNowError, + WaybackClient, + WaybackContentError, + WaybackError, + cdx_to_dict, + fix_transfer_encoding, +) +from sandcrawler.ingest_html import ( + WebResource, + fetch_html_resources, + html_extract_body_teixml, + html_guess_platform, + html_guess_scope, + quick_fetch_html_resources, +) from sandcrawler.misc import clean_url, gen_file_metadata, parse_cdx_datetime from sandcrawler.pdfextract import PdfExtractResult, process_pdf from sandcrawler.workers import SandcrawlerWorker @@ -53,74 +70,71 @@ class IngestFileWorker(SandcrawlerWorker): process_file_hit(ResourceResult) -> response process_grobid(ResourceResult) """ + def __init__(self, sink: Optional[SandcrawlerWorker] = None, **kwargs): super().__init__() self.sink = sink - if kwargs.get('wayback_client'): - self.wayback_client: WaybackClient = kwargs['wayback_client'] + if kwargs.get("wayback_client"): + self.wayback_client: WaybackClient = kwargs["wayback_client"] else: self.wayback_client = WaybackClient() - if kwargs.get('spn_client'): - self.spn_client: SavePageNowClient = kwargs['spn_client'] + if kwargs.get("spn_client"): + self.spn_client: SavePageNowClient = kwargs["spn_client"] else: self.spn_client = SavePageNowClient( - spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0)) + spn_cdx_retry_sec=kwargs.get("spn_cdx_retry_sec", 9.0) + ) - if kwargs.get('grobid_client'): - self.grobid_client: GrobidClient = kwargs['grobid_client'] + if kwargs.get("grobid_client"): + self.grobid_client: GrobidClient = kwargs["grobid_client"] else: self.grobid_client = GrobidClient() - if kwargs.get('pgrest_client'): - self.pgrest_client: SandcrawlerPostgrestClient = kwargs['pgrest_client'] + if kwargs.get("pgrest_client"): + self.pgrest_client: SandcrawlerPostgrestClient = kwargs["pgrest_client"] else: self.pgrest_client = SandcrawlerPostgrestClient() - self.grobid_sink = kwargs.get('grobid_sink') - self.thumbnail_sink = kwargs.get('thumbnail_sink') - self.pdftext_sink = kwargs.get('pdftext_sink') - self.xmldoc_sink = kwargs.get('xmldoc_sink') - self.htmlteixml_sink = kwargs.get('htmlteixml_sink') + self.grobid_sink = kwargs.get("grobid_sink") + self.thumbnail_sink = kwargs.get("thumbnail_sink") + self.pdftext_sink = kwargs.get("pdftext_sink") + self.xmldoc_sink = kwargs.get("xmldoc_sink") + self.htmlteixml_sink = kwargs.get("htmlteixml_sink") self.max_hops = 6 - self.try_existing_ingest = kwargs.get('try_existing_ingest', False) - self.try_existing_grobid = kwargs.get('try_existing_grobid', True) - self.try_existing_pdfextract = kwargs.get('try_existing_pdfextract', True) - self.try_wayback = kwargs.get('try_wayback', True) - self.try_spn2 = kwargs.get('try_spn2', True) - self.html_quick_mode = kwargs.get('html_quick_mode', False) + self.try_existing_ingest = kwargs.get("try_existing_ingest", False) + self.try_existing_grobid = kwargs.get("try_existing_grobid", True) + self.try_existing_pdfextract = kwargs.get("try_existing_pdfextract", True) + self.try_wayback = kwargs.get("try_wayback", True) + self.try_spn2 = kwargs.get("try_spn2", True) + self.html_quick_mode = kwargs.get("html_quick_mode", False) self.adblock_rules = load_adblock_rules() self.max_html_resources = 200 self.base_url_blocklist = [ # robot blocking "://hkvalidate.perfdrive.com/", - # temporary, until we implement specific fetch and 'petabox' output "://archive.org/", "://www.archive.org/", "://web.archive.org/web/", - # out of scope "://openlibrary.org/", "://www.openlibrary.org/", "://fatcat.wiki/", "://orcid.org/", "://doaj.org/", - # Domain squats "://bartandjones.com", "://ijretm.com", "://ijrcemas.com", "://jist.net.in", "://croisements-revue.org", - # all stubs/previews, not full papers "://page-one.live.cf.public.springer.com", - # large datasets-only (no PDF expected) "plutof.ut.ee/", "www.gbif.org/", @@ -129,16 +143,13 @@ class IngestFileWorker(SandcrawlerWorker): "://doi.org/10.25642/ipk/gbis/", "://apex.ipk-gatersleben.de/", "fao.org/glis/", - # Historical non-paper content: "dhz.uni-passau.de/", # newspapers "digital.ucd.ie/", # ireland national historical - # DOI prefixes "doi.org/10.2307/", # JSTOR; slow and many redirects "doi.org/10.18730/", # fao.org: database entry "doi.org/10.15468/", # gbif.org: database entry - # deprecated domain (doesn't redirect correctly) "://edoc.mpg.de/", ] @@ -216,15 +227,14 @@ class IngestFileWorker(SandcrawlerWorker): return None existing = self.pgrest_client.get_ingest_file_result(ingest_type, base_url) # TODO: filter on more flags? - if existing and existing['hit'] is True: + if existing and existing["hit"] is True: return existing else: return None - def find_resource(self, - url: str, - best_mimetype: Optional[str] = None, - force_recrawl: bool = False) -> Optional[ResourceResult]: + def find_resource( + self, url: str, best_mimetype: Optional[str] = None, force_recrawl: bool = False + ) -> Optional[ResourceResult]: """ Looks in wayback for a resource starting at the URL, following any redirects. If a hit isn't found, try crawling with SPN. @@ -233,7 +243,8 @@ class IngestFileWorker(SandcrawlerWorker): resource = None if url.startswith("http://web.archive.org/web/") or url.startswith( - "https://web.archive.org/web/"): + "https://web.archive.org/web/" + ): raise NotImplementedError("handling direct wayback links not supported yet") if url.startswith("http://archive.org/") or url.startswith("https://archive.org/"): @@ -247,20 +258,32 @@ class IngestFileWorker(SandcrawlerWorker): soft404 = False # NOTE: these are often not working with SPNv2 either, so disabling. If # we really want to try again, should do force-recrawl - #if resource and resource.hit and resource.terminal_url.endswith('/cookieAbsent'): + # if resource and resource.hit and resource.terminal_url.endswith('/cookieAbsent'): # soft404 = True old_failure = False - if resource and not resource.hit and resource.terminal_dt and resource.terminal_dt < '20190000000000': + if ( + resource + and not resource.hit + and resource.terminal_dt + and resource.terminal_dt < "20190000000000" + ): old_failure = True - if self.try_spn2 and (resource is None or (resource and resource.status == 'no-capture') - or soft404 or old_failure): + if self.try_spn2 and ( + resource is None + or (resource and resource.status == "no-capture") + or soft404 + or old_failure + ): via = "spn2" resource = self.spn_client.crawl_resource(url, self.wayback_client) - print("[FETCH {:>6}] {} {}".format(via, (resource and resource.status), - (resource and resource.terminal_url) or url), - file=sys.stderr) + print( + "[FETCH {:>6}] {} {}".format( + via, (resource and resource.status), (resource and resource.terminal_url) or url + ), + file=sys.stderr, + ) return resource def process_existing(self, request: dict, result_row: dict) -> dict: @@ -269,51 +292,55 @@ class IngestFileWorker(SandcrawlerWorker): additional processing necessary to return a result. """ raise NotImplementedError("process_existing() not tested or safe yet") - assert result_row['hit'] - existing_file_meta = self.pgrest_client.get_file_meta(result_row['terminal_sha1hex']) - existing_grobid = self.pgrest_client.get_grobid(result_row['terminal_sha1hex']) - existing_cdx = self.pgrest_client.get_cdx(result_row['terminal_url'], - result_row['terminal_dt']) + assert result_row["hit"] + existing_file_meta = self.pgrest_client.get_file_meta(result_row["terminal_sha1hex"]) + existing_grobid = self.pgrest_client.get_grobid(result_row["terminal_sha1hex"]) + existing_cdx = self.pgrest_client.get_cdx( + result_row["terminal_url"], result_row["terminal_dt"] + ) if not (existing_file_meta and existing_grobid and existing_cdx): raise NotImplementedError("partially-exsiting records not implemented yet") result = { - 'hit': result_row['hit'], - 'status': "existing", - 'request': request, - 'grobid': existing_grobid, - 'file_meta': existing_file_meta, - 'cdx': existing_cdx, - 'terminal': { - 'terminal_url': result_row['terminal_url'], - 'terminal_dt': result_row['terminal_dt'], - 'terminal_status_code': result_row['terminal_status_code'], - 'terminal_sha1hex': result_row['terminal_sha1hex'], + "hit": result_row["hit"], + "status": "existing", + "request": request, + "grobid": existing_grobid, + "file_meta": existing_file_meta, + "cdx": existing_cdx, + "terminal": { + "terminal_url": result_row["terminal_url"], + "terminal_dt": result_row["terminal_dt"], + "terminal_status_code": result_row["terminal_status_code"], + "terminal_sha1hex": result_row["terminal_sha1hex"], }, } return result - def process_file_hit(self, ingest_type: str, resource: ResourceResult, - file_meta: dict) -> dict: + def process_file_hit( + self, ingest_type: str, resource: ResourceResult, file_meta: dict + ) -> dict: """ Run all the necessary processing for a new/fresh ingest hit. """ - if ingest_type in ["dataset-file", "component" - ] and file_meta['mimetype'] == "application/pdf": + if ( + ingest_type in ["dataset-file", "component"] + and file_meta["mimetype"] == "application/pdf" + ): ingest_type = "pdf" if ingest_type == "pdf": return { - 'grobid': self.process_grobid(resource, file_meta), - 'pdf_meta': self.process_pdfextract(resource, file_meta), + "grobid": self.process_grobid(resource, file_meta), + "pdf_meta": self.process_pdfextract(resource, file_meta), } elif ingest_type == "xml": return { - 'xml_meta': self.process_xml(resource, file_meta), + "xml_meta": self.process_xml(resource, file_meta), } elif ingest_type == "html": html_info = self.process_html(resource, file_meta) # if there is no html_biblio, don't clobber anything possibly extracted earlier - if 'html_biblio' in html_info and not html_info['html_biblio']: - html_info.pop('html_biblio') + if "html_biblio" in html_info and not html_info["html_biblio"]: + html_info.pop("html_biblio") return html_info elif ingest_type == "src": return {} @@ -332,7 +359,7 @@ class IngestFileWorker(SandcrawlerWorker): decide if we should re-process """ if self.try_existing_grobid: - existing = self.pgrest_client.get_grobid(file_meta['sha1hex']) + existing = self.pgrest_client.get_grobid(file_meta["sha1hex"]) if existing: print("found existing GROBID result", file=sys.stderr) return existing @@ -341,18 +368,18 @@ class IngestFileWorker(SandcrawlerWorker): result = self.grobid_client.process_fulltext(resource.body) if self.grobid_sink: # extra fields for GROBID kafka messages - result['file_meta'] = file_meta - result['key'] = result['file_meta']['sha1hex'] + result["file_meta"] = file_meta + result["key"] = result["file_meta"]["sha1hex"] self.grobid_sink.push_record(result.copy()) - if result['status'] == "success": + if result["status"] == "success": metadata = self.grobid_client.metadata(result) if metadata: - result['metadata'] = self.grobid_client.metadata(result) - result['fatcat_release'] = result['metadata'].pop('fatcat_release', None) - result['grobid_version'] = result['metadata'].pop('grobid_version', None) - result.pop('tei_xml', None) - result.pop('file_meta', None) - result.pop('key', None) + result["metadata"] = self.grobid_client.metadata(result) + result["fatcat_release"] = result["metadata"].pop("fatcat_release", None) + result["grobid_version"] = result["metadata"].pop("grobid_version", None) + result.pop("tei_xml", None) + result.pop("file_meta", None) + result.pop("key", None) return result def process_pdfextract(self, resource: ResourceResult, file_meta: dict) -> dict: @@ -365,7 +392,7 @@ class IngestFileWorker(SandcrawlerWorker): TODO: difference between Kafka schema and SQL/postgrest schema """ if self.try_existing_pdfextract: - existing = self.pgrest_client.get_pdf_meta(file_meta['sha1hex']) + existing = self.pgrest_client.get_pdf_meta(file_meta["sha1hex"]) if existing: print("found existing pdf_meta result", file=sys.stderr) result = PdfExtractResult.from_pdf_meta_dict(existing) @@ -373,9 +400,9 @@ class IngestFileWorker(SandcrawlerWorker): # Need to actually processes result = process_pdf(resource.body) - assert result.sha1hex == file_meta['sha1hex'] + assert result.sha1hex == file_meta["sha1hex"] assert result.file_meta is not None - assert result.file_meta['sha1hex'] == file_meta['sha1hex'] + assert result.file_meta["sha1hex"] == file_meta["sha1hex"] if self.thumbnail_sink and result.page0_thumbnail is not None: self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) if self.pdftext_sink: @@ -392,7 +419,7 @@ class IngestFileWorker(SandcrawlerWorker): In the future, could extract other metadata here (like body word count), or attempting to fetch sub-resources. """ - if self.xmldoc_sink and file_meta['mimetype'] == "application/jats+xml": + if self.xmldoc_sink and file_meta["mimetype"] == "application/jats+xml": try: jats_xml = xml_reserialize(resource.body) except xml.etree.ElementTree.ParseError: @@ -402,7 +429,7 @@ class IngestFileWorker(SandcrawlerWorker): status="success", jats_xml=jats_xml, ) - self.xmldoc_sink.push_record(msg, key=file_meta['sha1hex']) + self.xmldoc_sink.push_record(msg, key=file_meta["sha1hex"]) return dict(status="success") def process_html(self, resource: ResourceResult, file_meta: dict) -> dict: @@ -416,11 +443,12 @@ class IngestFileWorker(SandcrawlerWorker): assert html_biblio html_body = html_extract_body_teixml(resource.body) html_platform = html_guess_platform(resource.terminal_url, html_doc, html_biblio) - html_scope = html_guess_scope(resource.terminal_url, html_doc, html_biblio, - html_body.get('word_count')) + html_scope = html_guess_scope( + resource.terminal_url, html_doc, html_biblio, html_body.get("word_count") + ) html_biblio_dict = json.loads(html_biblio.json(exclude_none=True)) - if html_scope in ('blocked-captcha', 'blocked-cookie', 'blocked-forbidden'): + if html_scope in ("blocked-captcha", "blocked-cookie", "blocked-forbidden"): return dict( status=html_scope, html_biblio=html_biblio_dict, @@ -428,8 +456,8 @@ class IngestFileWorker(SandcrawlerWorker): platform=html_platform, ) elif html_scope not in ( - 'article-fulltext', - 'unknown', + "article-fulltext", + "unknown", ): html_body.pop("tei_xml", None) return dict( @@ -440,8 +468,9 @@ class IngestFileWorker(SandcrawlerWorker): html_body=html_body, ) - raw_resources = html_extract_resources(resource.terminal_url, html_doc, - self.adblock_rules) + raw_resources = html_extract_resources( + resource.terminal_url, html_doc, self.adblock_rules + ) if len(raw_resources) > self.max_html_resources: html_body.pop("tei_xml", None) return dict( @@ -452,8 +481,8 @@ class IngestFileWorker(SandcrawlerWorker): html_body=html_body, ) - if self.htmlteixml_sink and html_body['status'] == "success": - self.htmlteixml_sink.push_record(html_body, key=file_meta['sha1hex']) + if self.htmlteixml_sink and html_body["status"] == "success": + self.htmlteixml_sink.push_record(html_body, key=file_meta["sha1hex"]) html_body.pop("tei_xml", None) @@ -470,30 +499,30 @@ class IngestFileWorker(SandcrawlerWorker): try: if self.html_quick_mode: print(" WARN: running quick CDX-only fetches", file=sys.stderr) - full_resources = quick_fetch_html_resources(raw_resources, - self.wayback_client.cdx_client, - when) + full_resources = quick_fetch_html_resources( + raw_resources, self.wayback_client.cdx_client, when + ) else: full_resources = fetch_html_resources(raw_resources, self.wayback_client, when) except PetaboxError as e: - partial_result['status'] = 'petabox-error' - partial_result['error_message'] = str(e)[:1600] + partial_result["status"] = "petabox-error" + partial_result["error_message"] = str(e)[:1600] return partial_result except CdxApiError as e: - partial_result['status'] = 'cdx-error' - partial_result['error_message'] = str(e)[:1600] + partial_result["status"] = "cdx-error" + partial_result["error_message"] = str(e)[:1600] return partial_result except WaybackError as e: - partial_result['status'] = 'wayback-error' - partial_result['error_message'] = str(e)[:1600] + partial_result["status"] = "wayback-error" + partial_result["error_message"] = str(e)[:1600] return partial_result except WaybackContentError as e: - partial_result['status'] = 'wayback-content-error' - partial_result['error_message'] = str(e)[:1600] + partial_result["status"] = "wayback-content-error" + partial_result["error_message"] = str(e)[:1600] return partial_result except NoCaptureError as e: - partial_result['status'] = 'html-resource-no-capture' - partial_result['error_message'] = str(e)[:1600] + partial_result["status"] = "html-resource-no-capture" + partial_result["error_message"] = str(e)[:1600] return partial_result info = dict( @@ -503,8 +532,8 @@ class IngestFileWorker(SandcrawlerWorker): platform=html_platform, html_resources=[json.loads(r.json(exclude_none=True)) for r in full_resources], ) - if html_scope == 'unknown': - info['status'] = 'unknown-scope' + if html_scope == "unknown": + info["status"] = "unknown-scope" return info def timeout_response(self, task: dict) -> dict: @@ -517,7 +546,7 @@ class IngestFileWorker(SandcrawlerWorker): ) def want(self, request: dict) -> bool: - if not request.get('ingest_type') in ('file', 'pdf', 'xml', 'html', 'src', 'component'): + if not request.get("ingest_type") in ("file", "pdf", "xml", "html", "src", "component"): return False return True @@ -527,19 +556,19 @@ class IngestFileWorker(SandcrawlerWorker): def process_file(self, request: dict, key: Any = None) -> dict: # old backwards compatibility - if request.get('ingest_type') == 'file': - request['ingest_type'] = 'pdf' + if request.get("ingest_type") == "file": + request["ingest_type"] = "pdf" - ingest_type = request.get('ingest_type') + ingest_type = request.get("ingest_type") if ingest_type not in ("pdf", "xml", "html", "src", "component"): raise NotImplementedError(f"can't handle ingest_type={ingest_type}") # parse/clean URL # note that we pass through the original/raw URL, and that is what gets # persisted in database table - base_url = clean_url(request['base_url']) + base_url = clean_url(request["base_url"]) - force_recrawl = bool(request.get('force_recrawl', False)) + force_recrawl = bool(request.get("force_recrawl", False)) for block in self.base_url_blocklist: if block in base_url: @@ -569,112 +598,113 @@ class IngestFileWorker(SandcrawlerWorker): while len(hops) <= self.max_hops: - result['hops'] = hops + result["hops"] = hops # check against blocklist again on each hop for block in self.base_url_blocklist: if block in next_url: - result['status'] = "skip-url-blocklist" + result["status"] = "skip-url-blocklist" return result # check against known loginwall URLs for block in self.wall_blocklist: if block in next_url: # TODO: blocked-wall instead of skip-wall - result['status'] = "skip-wall" + result["status"] = "skip-wall" return result # check for popular cookie blocking URL patterns. On successful SPN # crawls, shouldn't see these redirect URLs for pattern in self.cookie_blocklist: if pattern in next_url: - result['status'] = 'blocked-cookie' + result["status"] = "blocked-cookie" return result try: - resource = self.find_resource(next_url, - best_mimetype, - force_recrawl=force_recrawl) + resource = self.find_resource( + next_url, best_mimetype, force_recrawl=force_recrawl + ) except SavePageNowError as e: - result['status'] = 'spn2-error' - result['error_message'] = str(e)[:1600] + result["status"] = "spn2-error" + result["error_message"] = str(e)[:1600] return result except PetaboxError as e: - result['status'] = 'petabox-error' - result['error_message'] = str(e)[:1600] + result["status"] = "petabox-error" + result["error_message"] = str(e)[:1600] return result except CdxApiError as e: - result['status'] = 'cdx-error' - result['error_message'] = str(e)[:1600] + result["status"] = "cdx-error" + result["error_message"] = str(e)[:1600] # add a sleep in cdx-error path as a slow-down time.sleep(2.0) return result except WaybackError as e: - result['status'] = 'wayback-error' - result['error_message'] = str(e)[:1600] + result["status"] = "wayback-error" + result["error_message"] = str(e)[:1600] return result except WaybackContentError as e: - result['status'] = 'wayback-content-error' - result['error_message'] = str(e)[:1600] + result["status"] = "wayback-content-error" + result["error_message"] = str(e)[:1600] return result except NotImplementedError as e: - result['status'] = 'not-implemented' - result['error_message'] = str(e)[:1600] + result["status"] = "not-implemented" + result["error_message"] = str(e)[:1600] return result assert resource if resource.terminal_url: - result['terminal'] = { + result["terminal"] = { "terminal_url": resource.terminal_url, "terminal_dt": resource.terminal_dt, "terminal_status_code": resource.terminal_status_code, } - if resource.terminal_url not in result['hops']: - result['hops'].append(resource.terminal_url) + if resource.terminal_url not in result["hops"]: + result["hops"].append(resource.terminal_url) if not resource.hit: - result['status'] = resource.status + result["status"] = resource.status return result if resource.terminal_url: for pattern in self.base_url_blocklist: if pattern in resource.terminal_url: - result['status'] = 'skip-url-blocklist' + result["status"] = "skip-url-blocklist" return result if resource.terminal_url: for pattern in self.cookie_blocklist: if pattern in resource.terminal_url: - result['status'] = 'blocked-cookie' + result["status"] = "blocked-cookie" return result if not resource.body: - result['status'] = 'null-body' + result["status"] = "null-body" return result if len(resource.body) > MAX_BODY_SIZE_BYTES: - result['status'] = 'body-too-large' + result["status"] = "body-too-large" return result file_meta = gen_file_metadata(resource.body) try: file_meta, resource = fix_transfer_encoding(file_meta, resource) except Exception as e: - result['status'] = 'bad-gzip-encoding' - result['error_message'] = str(e) + result["status"] = "bad-gzip-encoding" + result["error_message"] = str(e) return result - if not resource.body or file_meta['size_bytes'] == 0: - result['status'] = 'null-body' + if not resource.body or file_meta["size_bytes"] == 0: + result["status"] = "null-body" return result # here we split based on ingest type to try and extract a next hop html_ish_resource = bool( - "html" in file_meta['mimetype'] - or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml" - or "application/xml" in file_meta['mimetype'] - or "text/xml" in file_meta['mimetype']) + "html" in file_meta["mimetype"] + or "xhtml" in file_meta["mimetype"] # matches "application/xhtml+xml" + or "application/xml" in file_meta["mimetype"] + or "text/xml" in file_meta["mimetype"] + ) html_biblio = None html_doc = None if html_ish_resource and resource.body: @@ -682,10 +712,11 @@ class IngestFileWorker(SandcrawlerWorker): html_doc = HTMLParser(resource.body) html_biblio = html_extract_biblio(resource.terminal_url, html_doc) if html_biblio: - if 'html_biblio' not in result and html_biblio.title: - result['html_biblio'] = json.loads( - html_biblio.json(exclude_none=True)) - #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr) + if "html_biblio" not in result and html_biblio.title: + result["html_biblio"] = json.loads( + html_biblio.json(exclude_none=True) + ) + # print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr) except ValueError: pass @@ -700,27 +731,32 @@ class IngestFileWorker(SandcrawlerWorker): else: fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body) - result['extract_next_hop'] = fulltext_url + result["extract_next_hop"] = fulltext_url if not fulltext_url: - result['status'] = 'no-pdf-link' + result["status"] = "no-pdf-link" return result - next_url = fulltext_url.get('pdf_url') or fulltext_url.get('next_url') or "" + next_url = fulltext_url.get("pdf_url") or fulltext_url.get("next_url") or "" assert next_url next_url = clean_url(next_url) - print("[PARSE {:>6}] {} {}".format( - ingest_type, - fulltext_url.get('technique'), - next_url, - ), - file=sys.stderr) + print( + "[PARSE {:>6}] {} {}".format( + ingest_type, + fulltext_url.get("technique"), + next_url, + ), + file=sys.stderr, + ) if next_url in hops: - result['status'] = 'link-loop' - result['error_message'] = "repeated: {}".format(next_url) + result["status"] = "link-loop" + result["error_message"] = "repeated: {}".format(next_url) return result hops.append(next_url) continue - elif ingest_type in ("xml", "html", - "component") and html_ish_resource and html_biblio: + elif ( + ingest_type in ("xml", "html", "component") + and html_ish_resource + and html_biblio + ): # NOTE: src_fulltext_url is not a thing next_url_found = None if ingest_type == "xml" and html_biblio.xml_fulltext_url: @@ -733,18 +769,20 @@ class IngestFileWorker(SandcrawlerWorker): if next_url_found: next_url = next_url_found technique = "html_biblio" - print("[PARSE {:>6}] {} {}".format( - ingest_type, - technique, - next_url, - ), - file=sys.stderr) + print( + "[PARSE {:>6}] {} {}".format( + ingest_type, + technique, + next_url, + ), + file=sys.stderr, + ) if next_url in hops: if ingest_type == "html": # for HTML ingest, we don't count this as a link-loop break - result['status'] = 'link-loop' - result['error_message'] = "repeated: {}".format(next_url) + result["status"] = "link-loop" + result["error_message"] = "repeated: {}".format(next_url) return result hops.append(next_url) continue @@ -753,7 +791,7 @@ class IngestFileWorker(SandcrawlerWorker): break if len(hops) >= self.max_hops: - result['status'] = "max-hops-exceeded" + result["status"] = "max-hops-exceeded" return result # fetch must be a hit if we got this far (though not necessarily an ingest hit!) @@ -762,38 +800,41 @@ class IngestFileWorker(SandcrawlerWorker): assert resource.terminal_status_code in (200, 226) if resource.terminal_url: - result['terminal'] = { + result["terminal"] = { "terminal_url": resource.terminal_url, "terminal_dt": resource.terminal_dt, "terminal_status_code": resource.terminal_status_code, - "terminal_sha1hex": file_meta['sha1hex'], + "terminal_sha1hex": file_meta["sha1hex"], } - result['file_meta'] = file_meta - result['cdx'] = cdx_to_dict(resource.cdx) + result["file_meta"] = file_meta + result["cdx"] = cdx_to_dict(resource.cdx) if resource.revisit_cdx: - result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx) + result["revisit_cdx"] = cdx_to_dict(resource.revisit_cdx) if ingest_type == "pdf": - if file_meta['mimetype'] != "application/pdf": - result['status'] = "wrong-mimetype" # formerly: "other-mimetype" + if file_meta["mimetype"] != "application/pdf": + result["status"] = "wrong-mimetype" # formerly: "other-mimetype" return result elif ingest_type == "xml": - if file_meta['mimetype'] not in ("application/xml", "text/xml", - "application/jats+xml"): - result['status'] = "wrong-mimetype" + if file_meta["mimetype"] not in ( + "application/xml", + "text/xml", + "application/jats+xml", + ): + result["status"] = "wrong-mimetype" return result elif ingest_type == "html": - if file_meta['mimetype'] not in ("text/html", "application/xhtml+xml"): - result['status'] = "wrong-mimetype" + if file_meta["mimetype"] not in ("text/html", "application/xhtml+xml"): + result["status"] = "wrong-mimetype" return result elif ingest_type == "src": - if file_meta['mimetype'] not in self.src_valid_mimetypes: - result['status'] = "wrong-mimetype" + if file_meta["mimetype"] not in self.src_valid_mimetypes: + result["status"] = "wrong-mimetype" return result elif ingest_type == "component": - if file_meta['mimetype'] not in self.component_valid_mimetypes: - result['status'] = "wrong-mimetype" + if file_meta["mimetype"] not in self.component_valid_mimetypes: + result["status"] = "wrong-mimetype" return result else: raise NotImplementedError() @@ -802,26 +843,30 @@ class IngestFileWorker(SandcrawlerWorker): result.update(info) # check if processing turned up an error - if info.get('status') not in ('success', None): - result['status'] = info['status'] + if info.get("status") not in ("success", None): + result["status"] = info["status"] return result - result['status'] = "success" - result['hit'] = True + result["status"] = "success" + result["hit"] = True if ingest_type == "pdf": - print("[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format( - ingest_type, - result.get('file_meta', {}).get('sha1hex'), - result.get('grobid', {}).get('status_code'), - result.get('pdf_meta', {}).get('status'), - ), - file=sys.stderr) + print( + "[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format( + ingest_type, + result.get("file_meta", {}).get("sha1hex"), + result.get("grobid", {}).get("status_code"), + result.get("pdf_meta", {}).get("status"), + ), + file=sys.stderr, + ) else: - print("[SUCCESS {:>5}] sha1:{}".format( - ingest_type, - result.get('file_meta', {}).get('sha1hex'), - ), - file=sys.stderr) + print( + "[SUCCESS {:>5}] sha1:{}".format( + ingest_type, + result.get("file_meta", {}).get("sha1hex"), + ), + file=sys.stderr, + ) return result @@ -832,11 +877,11 @@ class IngestFileRequestHandler(BaseHTTPRequestHandler): self.end_headers() self.wfile.write(b"404: Not Found") return - length = int(self.headers.get('content-length')) - request = json.loads(self.rfile.read(length).decode('utf-8')) + length = int(self.headers.get("content-length")) + request = json.loads(self.rfile.read(length).decode("utf-8")) print("Got request: {}".format(request)) ingester = IngestFileWorker() result = ingester.process(request) self.send_response(200) self.end_headers() - self.wfile.write(json.dumps(result).encode('utf8')) + self.wfile.write(json.dumps(result).encode("utf8")) diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py index 5728e24..227f511 100644 --- a/python/sandcrawler/ingest_fileset.py +++ b/python/sandcrawler/ingest_fileset.py @@ -6,15 +6,33 @@ from typing import Any, Dict, Optional import requests from selectolax.parser import HTMLParser -from sandcrawler.fileset_platforms import (ArchiveOrgHelper, DataverseHelper, FigshareHelper, - ZenodoHelper) -from sandcrawler.fileset_strategies import (ArchiveorgFilesetStrategy, ArchiveorgFileStrategy, - WebFilesetStrategy, WebFileStrategy) -from sandcrawler.fileset_types import (IngestStrategy, PlatformRestrictedError, - PlatformScopeError) +from sandcrawler.fileset_platforms import ( + ArchiveOrgHelper, + DataverseHelper, + FigshareHelper, + ZenodoHelper, +) +from sandcrawler.fileset_strategies import ( + ArchiveorgFilesetStrategy, + ArchiveorgFileStrategy, + WebFilesetStrategy, + WebFileStrategy, +) +from sandcrawler.fileset_types import ( + IngestStrategy, + PlatformRestrictedError, + PlatformScopeError, +) from sandcrawler.html_metadata import html_extract_biblio -from sandcrawler.ia import (CdxApiError, PetaboxError, SavePageNowError, WaybackContentError, - WaybackError, cdx_to_dict, fix_transfer_encoding) +from sandcrawler.ia import ( + CdxApiError, + PetaboxError, + SavePageNowError, + WaybackContentError, + WaybackError, + cdx_to_dict, + fix_transfer_encoding, +) from sandcrawler.ingest_file import IngestFileWorker from sandcrawler.misc import clean_url, gen_file_metadata from sandcrawler.workers import SandcrawlerWorker @@ -35,15 +53,16 @@ class IngestFilesetWorker(IngestFileWorker): checking to see if content has been archived already) 4. summarize status """ + def __init__(self, sink: Optional[SandcrawlerWorker] = None, **kwargs): super().__init__(sink=None, **kwargs) self.sink = sink self.dataset_platform_helpers = { - 'dataverse': DataverseHelper(), - 'figshare': FigshareHelper(), - 'zenodo': ZenodoHelper(), - 'archiveorg': ArchiveOrgHelper(), + "dataverse": DataverseHelper(), + "figshare": FigshareHelper(), + "zenodo": ZenodoHelper(), + "archiveorg": ArchiveOrgHelper(), } self.dataset_strategy_archivers = { IngestStrategy.ArchiveorgFileset: ArchiveorgFilesetStrategy(), @@ -52,10 +71,10 @@ class IngestFilesetWorker(IngestFileWorker): IngestStrategy.WebFile: WebFileStrategy(), } - self.max_total_size = kwargs.get('max_total_size', 64 * 1024 * 1024 * 1024) - self.max_file_count = kwargs.get('max_file_count', 200) - self.ingest_file_result_sink = kwargs.get('ingest_file_result_sink') - self.ingest_file_result_stdout = kwargs.get('ingest_file_result_stdout', False) + self.max_total_size = kwargs.get("max_total_size", 64 * 1024 * 1024 * 1024) + self.max_file_count = kwargs.get("max_file_count", 200) + self.ingest_file_result_sink = kwargs.get("ingest_file_result_sink") + self.ingest_file_result_stdout = kwargs.get("ingest_file_result_stdout", False) def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: """ @@ -65,7 +84,7 @@ class IngestFilesetWorker(IngestFileWorker): return None existing = self.pgrest_client.get_ingest_fileset_platform(ingest_type, base_url) # TODO: filter on more flags? - if existing and existing['hit'] is True: + if existing and existing["hit"] is True: return existing else: return None @@ -78,112 +97,114 @@ class IngestFilesetWorker(IngestFileWorker): raise NotImplementedError("process_existing() not tested or safe yet") def want(self, request: dict) -> bool: - if not request.get('ingest_type') in ('dataset', ): + if not request.get("ingest_type") in ("dataset",): return False return True - def fetch_resource_iteratively(self, ingest_type: str, base_url: str, - force_recrawl: bool) -> dict: + def fetch_resource_iteratively( + self, ingest_type: str, base_url: str, force_recrawl: bool + ) -> dict: """ This is copypasta from process_file(), should probably refactor. """ result: Dict[str, Any] = dict(hit=False) - result['hops'] = [base_url] + result["hops"] = [base_url] next_url = base_url # check against blocklist for block in self.base_url_blocklist: # NOTE: hack to not skip archive.org content - if 'archive.org' in block: + if "archive.org" in block: continue if block in next_url: - result['status'] = "skip-url-blocklist" + result["status"] = "skip-url-blocklist" return result try: resource = self.find_resource(next_url, force_recrawl=force_recrawl) except SavePageNowError as e: - result['status'] = 'spn2-error' - result['error_message'] = str(e)[:1600] + result["status"] = "spn2-error" + result["error_message"] = str(e)[:1600] return result except PetaboxError as e: - result['status'] = 'petabox-error' - result['error_message'] = str(e)[:1600] + result["status"] = "petabox-error" + result["error_message"] = str(e)[:1600] return result except CdxApiError as e: - result['status'] = 'cdx-error' - result['error_message'] = str(e)[:1600] + result["status"] = "cdx-error" + result["error_message"] = str(e)[:1600] # add a sleep in cdx-error path as a slow-down time.sleep(2.0) return result except WaybackError as e: - result['status'] = 'wayback-error' - result['error_message'] = str(e)[:1600] + result["status"] = "wayback-error" + result["error_message"] = str(e)[:1600] return result except WaybackContentError as e: - result['status'] = 'wayback-content-error' - result['error_message'] = str(e)[:1600] + result["status"] = "wayback-content-error" + result["error_message"] = str(e)[:1600] return result except NotImplementedError: - #result['status'] = 'not-implemented' - #result['error_message'] = str(e)[:1600] - #return result + # result['status'] = 'not-implemented' + # result['error_message'] = str(e)[:1600] + # return result resource = None html_biblio = None if resource: if resource.terminal_url: - result['terminal'] = { + result["terminal"] = { "terminal_url": resource.terminal_url, "terminal_dt": resource.terminal_dt, "terminal_status_code": resource.terminal_status_code, } - if resource.terminal_url not in result['hops']: - result['hops'].append(resource.terminal_url) + if resource.terminal_url not in result["hops"]: + result["hops"].append(resource.terminal_url) if not resource.hit: - result['status'] = resource.status + result["status"] = resource.status return result if resource.terminal_url: for pattern in self.base_url_blocklist: if pattern in resource.terminal_url: - result['status'] = 'skip-url-blocklist' + result["status"] = "skip-url-blocklist" return result if resource.terminal_url: for pattern in self.cookie_blocklist: if pattern in resource.terminal_url: - result['status'] = 'blocked-cookie' + result["status"] = "blocked-cookie" return result if not resource.body: - result['status'] = 'null-body' + result["status"] = "null-body" return result if len(resource.body) > MAX_BODY_SIZE_BYTES: - result['status'] = 'body-too-large' + result["status"] = "body-too-large" return result file_meta = gen_file_metadata(resource.body) try: file_meta, resource = fix_transfer_encoding(file_meta, resource) except Exception as e: - result['status'] = 'bad-gzip-encoding' - result['error_message'] = str(e) + result["status"] = "bad-gzip-encoding" + result["error_message"] = str(e) return result - if not resource.body or file_meta['size_bytes'] == 0: - result['status'] = 'null-body' + if not resource.body or file_meta["size_bytes"] == 0: + result["status"] = "null-body" return result # here we split based on ingest type to try and extract a next hop html_ish_resource = bool( - "html" in file_meta['mimetype'] - or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml" - or "application/xml" in file_meta['mimetype'] - or "text/xml" in file_meta['mimetype']) + "html" in file_meta["mimetype"] + or "xhtml" in file_meta["mimetype"] # matches "application/xhtml+xml" + or "application/xml" in file_meta["mimetype"] + or "text/xml" in file_meta["mimetype"] + ) html_biblio = None html_doc = None if html_ish_resource and resource.body: @@ -191,10 +212,11 @@ class IngestFilesetWorker(IngestFileWorker): html_doc = HTMLParser(resource.body) html_biblio = html_extract_biblio(resource.terminal_url, html_doc) if html_biblio: - if 'html_biblio' not in result and html_biblio.title: - result['html_biblio'] = json.loads( - html_biblio.json(exclude_none=True)) - #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr) + if "html_biblio" not in result and html_biblio.title: + result["html_biblio"] = json.loads( + html_biblio.json(exclude_none=True) + ) + # print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr) except ValueError: pass @@ -204,69 +226,72 @@ class IngestFilesetWorker(IngestFileWorker): assert resource.terminal_status_code in (200, 226) if resource.terminal_url: - result['terminal'] = { + result["terminal"] = { "terminal_url": resource.terminal_url, "terminal_dt": resource.terminal_dt, "terminal_status_code": resource.terminal_status_code, - "terminal_sha1hex": file_meta['sha1hex'], + "terminal_sha1hex": file_meta["sha1hex"], } - result['file_meta'] = file_meta - result['cdx'] = cdx_to_dict(resource.cdx) + result["file_meta"] = file_meta + result["cdx"] = cdx_to_dict(resource.cdx) if resource.revisit_cdx: - result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx) + result["revisit_cdx"] = cdx_to_dict(resource.revisit_cdx) if ingest_type == "pdf": - if file_meta['mimetype'] != "application/pdf": - result['status'] = "wrong-mimetype" # formerly: "other-mimetype" + if file_meta["mimetype"] != "application/pdf": + result["status"] = "wrong-mimetype" # formerly: "other-mimetype" return result elif ingest_type == "xml": - if file_meta['mimetype'] not in ("application/xml", "text/xml", - "application/jats+xml"): - result['status'] = "wrong-mimetype" + if file_meta["mimetype"] not in ( + "application/xml", + "text/xml", + "application/jats+xml", + ): + result["status"] = "wrong-mimetype" return result elif ingest_type == "html": - if file_meta['mimetype'] not in ("text/html", "application/xhtml+xml"): - result['status'] = "wrong-mimetype" + if file_meta["mimetype"] not in ("text/html", "application/xhtml+xml"): + result["status"] = "wrong-mimetype" return result else: - #raise NotImplementedError() + # raise NotImplementedError() pass - result['_html_biblio'] = html_biblio - result['_resource'] = resource + result["_html_biblio"] = html_biblio + result["_resource"] = resource return result def process(self, request: dict, key: Any = None) -> dict: - ingest_type = request.get('ingest_type') - if ingest_type not in ("dataset", ): + ingest_type = request.get("ingest_type") + if ingest_type not in ("dataset",): raise NotImplementedError(f"can't handle ingest_type={ingest_type}") # parse/clean URL # note that we pass through the original/raw URL, and that is what gets # persisted in database table - base_url = clean_url(request['base_url']) + base_url = clean_url(request["base_url"]) - force_recrawl = bool(request.get('force_recrawl', False)) + force_recrawl = bool(request.get("force_recrawl", False)) print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) # TODO: "existing" check against file and/or fileset ingest result table - #existing = self.check_existing_ingest(ingest_type, base_url) - #if existing: + # existing = self.check_existing_ingest(ingest_type, base_url) + # if existing: # return self.process_existing(request, existing) - result = self.fetch_resource_iteratively(ingest_type, - base_url, - force_recrawl=force_recrawl) - result['request'] = request - if result.get('status') is not None: - result['request'] = request + result = self.fetch_resource_iteratively( + ingest_type, base_url, force_recrawl=force_recrawl + ) + result["request"] = request + if result.get("status") is not None: + result["request"] = request return result - html_biblio = result.pop('_html_biblio') - resource = result.pop('_resource') + html_biblio = result.pop("_html_biblio") + resource = result.pop("_resource") # 1. Determine `platform`, which may involve resolving redirects and crawling a landing page. @@ -280,166 +305,183 @@ class IngestFilesetWorker(IngestFileWorker): break if not platform_helper: - result['status'] = 'no-platform-match' + result["status"] = "no-platform-match" return result # 2. Use platform-specific methods to fetch manifest metadata and decide on an `ingest_strategy`. try: dataset_meta = platform_helper.process_request(request, resource, html_biblio) except PlatformScopeError as e: - result['status'] = 'platform-scope' - result['error_message'] = str(e)[:1600] + result["status"] = "platform-scope" + result["error_message"] = str(e)[:1600] return result except PlatformRestrictedError as e: - result['status'] = 'platform-restricted' - result['error_message'] = str(e)[:1600] + result["status"] = "platform-restricted" + result["error_message"] = str(e)[:1600] return result except NotImplementedError as e: - result['status'] = 'not-implemented' - result['error_message'] = str(e)[:1600] + result["status"] = "not-implemented" + result["error_message"] = str(e)[:1600] return result except requests.exceptions.HTTPError as e: if e.response.status_code == 404: - result['status'] = 'platform-404' - result['error_message'] = str(e)[:1600] + result["status"] = "platform-404" + result["error_message"] = str(e)[:1600] return result else: raise e - #print(dataset_meta, file=sys.stderr) + # print(dataset_meta, file=sys.stderr) platform = dataset_meta.platform_name - result['platform_name'] = dataset_meta.platform_name - result['platform_domain'] = dataset_meta.platform_domain - result['platform_id'] = dataset_meta.platform_id - result['platform_base_url'] = dataset_meta.web_base_url - result['archiveorg_item_name'] = dataset_meta.archiveorg_item_name + result["platform_name"] = dataset_meta.platform_name + result["platform_domain"] = dataset_meta.platform_domain + result["platform_id"] = dataset_meta.platform_id + result["platform_base_url"] = dataset_meta.web_base_url + result["archiveorg_item_name"] = dataset_meta.archiveorg_item_name if not dataset_meta.manifest: - result['status'] = 'empty-manifest' + result["status"] = "empty-manifest" return result # these will get confirmed/updated after ingest - result['manifest'] = [m.dict(exclude_none=True) for m in dataset_meta.manifest] - result['file_count'] = len(dataset_meta.manifest) - result['total_size'] = sum([m.size for m in dataset_meta.manifest if m.size]) + result["manifest"] = [m.dict(exclude_none=True) for m in dataset_meta.manifest] + result["file_count"] = len(dataset_meta.manifest) + result["total_size"] = sum([m.size for m in dataset_meta.manifest if m.size]) - if result['total_size'] > self.max_total_size: - result['status'] = 'too-large-size' + if result["total_size"] > self.max_total_size: + result["status"] = "too-large-size" return result - if result['file_count'] > self.max_file_count: + if result["file_count"] > self.max_file_count: # hard max, to prevent downstream breakage - if result['file_count'] > 10 * 1000: - result['manifest'] = result['manifest'][:self.max_file_count] - result['status'] = 'too-many-files' + if result["file_count"] > 10 * 1000: + result["manifest"] = result["manifest"][: self.max_file_count] + result["status"] = "too-many-files" return result ingest_strategy = platform_helper.chose_strategy(dataset_meta) - result['ingest_strategy'] = ingest_strategy + result["ingest_strategy"] = ingest_strategy print( f"[PLATFORM {platform}] id={dataset_meta.platform_id} file_count={result['file_count']} total_size={result['total_size']} strategy={ingest_strategy}", - file=sys.stderr) + file=sys.stderr, + ) strategy_helper = self.dataset_strategy_archivers.get(ingest_strategy) if not strategy_helper: - result['status'] = 'no-strategy-helper' + result["status"] = "no-strategy-helper" return result # 3. Use strategy-specific methods to archive all files in platform manifest, and verify manifest metadata. archive_result = strategy_helper.process(dataset_meta) # 4. Summarize status and return structured result metadata. - result['status'] = archive_result.status - result['manifest'] = [m.dict(exclude_none=True) for m in archive_result.manifest] + result["status"] = archive_result.status + result["manifest"] = [m.dict(exclude_none=True) for m in archive_result.manifest] - if ingest_strategy.endswith('-fileset-bundle'): - result['fileset_bundle'] = dict() + if ingest_strategy.endswith("-fileset-bundle"): + result["fileset_bundle"] = dict() if archive_result.bundle_file_meta: - result['fileset_bundle']['file_meta'] = archive_result.bundle_file_meta + result["fileset_bundle"]["file_meta"] = archive_result.bundle_file_meta if archive_result.bundle_archiveorg_path: - result['fileset_bundle'][ - 'archiveorg_bundle_path'] = archive_result.bundle_archiveorg_path + result["fileset_bundle"][ + "archiveorg_bundle_path" + ] = archive_result.bundle_archiveorg_path if archive_result.bundle_resource: - result['fileset_bundle']['terminal'] = dict( + result["fileset_bundle"]["terminal"] = dict( terminal_url=archive_result.bundle_resource.terminal_url, terminal_dt=archive_result.bundle_resource.terminal_dt, terminal_status_code=archive_result.bundle_resource.terminal_status_code, ) if archive_result.bundle_resource.cdx: - result['fileset_bundle']['cdx'] = cdx_to_dict( - archive_result.bundle_resource.cdx) + result["fileset_bundle"]["cdx"] = cdx_to_dict( + archive_result.bundle_resource.cdx + ) if archive_result.bundle_resource.revisit_cdx: - result['fileset_bundle']['revisit_cdx'] = cdx_to_dict( - archive_result.bundle_resource.revisit_cdx) + result["fileset_bundle"]["revisit_cdx"] = cdx_to_dict( + archive_result.bundle_resource.revisit_cdx + ) - if ingest_strategy.endswith('-file'): - result['fileset_file'] = dict() + if ingest_strategy.endswith("-file"): + result["fileset_file"] = dict() if archive_result.file_file_meta: - result['fileset_file']['file_meta'] = archive_result.file_file_meta, + result["fileset_file"]["file_meta"] = (archive_result.file_file_meta,) if archive_result.file_resource: - result['fileset_file']['terminal'] = dict( + result["fileset_file"]["terminal"] = dict( terminal_url=archive_result.file_resource.terminal_url, terminal_dt=archive_result.file_resource.terminal_dt, terminal_status_code=archive_result.file_resource.terminal_status_code, ) if archive_result.file_resource.cdx: - result['fileset_file']['cdx'] = cdx_to_dict( - archive_result.file_resource.cdx) + result["fileset_file"]["cdx"] = cdx_to_dict( + archive_result.file_resource.cdx + ) if archive_result.file_resource.revisit_cdx: - result['fileset_file']['revisit_cdx'] = cdx_to_dict( - archive_result.file_resource.revisit_cdx) + result["fileset_file"]["revisit_cdx"] = cdx_to_dict( + archive_result.file_resource.revisit_cdx + ) - if result['status'].startswith('success'): + if result["status"].startswith("success"): # check that these are still valid - assert result['file_count'] == len(archive_result.manifest) - assert result['total_size'] == sum( - [m.size for m in archive_result.manifest if m.size]) + assert result["file_count"] == len(archive_result.manifest) + assert result["total_size"] == sum( + [m.size for m in archive_result.manifest if m.size] + ) - if result[ - 'status'] == 'success-file' and archive_result.file_resource and archive_result.file_file_meta: + if ( + result["status"] == "success-file" + and archive_result.file_resource + and archive_result.file_file_meta + ): file_result: Dict[str, Any] = dict( hit=True, - status='success', + status="success", request=request.copy(), file_meta=archive_result.file_file_meta, terminal=dict( terminal_url=archive_result.file_resource.terminal_url, terminal_dt=archive_result.file_resource.terminal_dt, terminal_status_code=archive_result.file_resource.terminal_status_code, - terminal_sha1hex=archive_result.file_file_meta['sha1hex'], + terminal_sha1hex=archive_result.file_file_meta["sha1hex"], ), ) if archive_result.file_resource.cdx: - file_result['cdx'] = cdx_to_dict(archive_result.file_resource.cdx) + file_result["cdx"] = cdx_to_dict(archive_result.file_resource.cdx) if archive_result.file_resource.revisit_cdx: - file_result['revisit_cdx'] = cdx_to_dict( - archive_result.file_resource.revisit_cdx) - file_result['request']['ingest_type'] = request['ingest_type'] + "-file" + file_result["revisit_cdx"] = cdx_to_dict( + archive_result.file_resource.revisit_cdx + ) + file_result["request"]["ingest_type"] = request["ingest_type"] + "-file" # call the super() (ingest_file) version of process_hit() - info = self.process_file_hit(file_result['request']['ingest_type'], - archive_result.file_resource, - archive_result.file_file_meta) + info = self.process_file_hit( + file_result["request"]["ingest_type"], + archive_result.file_resource, + archive_result.file_file_meta, + ) file_result.update(info) if self.ingest_file_result_sink: self.ingest_file_result_sink.push_record(result.copy()) elif self.ingest_file_result_stdout: sys.stdout.write(json.dumps(file_result, sort_keys=True) + "\n") - if result['status'].startswith('success'): - result['hit'] = True - print("[SUCCESS {:>5}] file_count={} total_size={} strategy={}".format( - ingest_type, - result['file_count'], - result['total_size'], - ingest_strategy, - ), - file=sys.stderr) + if result["status"].startswith("success"): + result["hit"] = True + print( + "[SUCCESS {:>5}] file_count={} total_size={} strategy={}".format( + ingest_type, + result["file_count"], + result["total_size"], + ingest_strategy, + ), + file=sys.stderr, + ) else: - print("[FAIL {:>5}] status={} file_count={} total_size={} strategy={}".format( - ingest_type, - result['status'], - result['file_count'], - result['total_size'], - ingest_strategy, - ), - file=sys.stderr) + print( + "[FAIL {:>5}] status={} file_count={} total_size={} strategy={}".format( + ingest_type, + result["status"], + result["file_count"], + result["total_size"], + ingest_strategy, + ), + file=sys.stderr, + ) return result diff --git a/python/sandcrawler/ingest_html.py b/python/sandcrawler/ingest_html.py index 91e5c6e..0ff7fe0 100644 --- a/python/sandcrawler/ingest_html.py +++ b/python/sandcrawler/ingest_html.py @@ -9,12 +9,26 @@ import pydantic import trafilatura from selectolax.parser import HTMLParser -from sandcrawler.html_metadata import (BiblioMetadata, html_extract_biblio, - html_extract_resources, load_adblock_rules) -from sandcrawler.ia import (CdxApiClient, NoCaptureError, WaybackClient, WaybackContentError, - cdx_to_dict, fix_transfer_encoding) -from sandcrawler.misc import (datetime_to_cdx, gen_file_metadata, parse_cdx_datetime, - url_fuzzy_equal) +from sandcrawler.html_metadata import ( + BiblioMetadata, + html_extract_biblio, + html_extract_resources, + load_adblock_rules, +) +from sandcrawler.ia import ( + CdxApiClient, + NoCaptureError, + WaybackClient, + WaybackContentError, + cdx_to_dict, + fix_transfer_encoding, +) +from sandcrawler.misc import ( + datetime_to_cdx, + gen_file_metadata, + parse_cdx_datetime, + url_fuzzy_equal, +) TRAFILATURA_AGENT = f"trafilatura/{trafilatura.__version__}" @@ -23,7 +37,7 @@ def html_extract_body_teixml(doc: bytes) -> dict: try: tei_xml = trafilatura.extract( doc, - output_format='xmltei', + output_format="xmltei", include_comments=False, include_formatting=True, ) @@ -35,12 +49,11 @@ def html_extract_body_teixml(doc: bytes) -> dict: if tei_xml: body_txt = teixml_body_text(tei_xml) word_count = len(body_txt.split()) - return dict(status="success", - agent=TRAFILATURA_AGENT, - tei_xml=tei_xml, - word_count=word_count) + return dict( + status="success", agent=TRAFILATURA_AGENT, tei_xml=tei_xml, word_count=word_count + ) elif doc.startswith( - b'' + b'' ): # hack for firstmonday.org return html_extract_body_teixml(doc[106:]) @@ -51,7 +64,7 @@ def html_extract_body_teixml(doc: bytes) -> dict: def teixml_body_text(doc_xml: str) -> str: ns = {"tei": "http://www.tei-c.org/ns/1.0"} tree = ET.fromstring(doc_xml) - body = tree.find('.//tei:body', ns) + body = tree.find(".//tei:body", ns) if body: return " ".join(body.itertext()) else: @@ -126,8 +139,9 @@ class HtmlMetaRow(pydantic.BaseModel): ) -def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient, - when: Optional[datetime.datetime]) -> List[WebResource]: +def quick_fetch_html_resources( + resources: List[dict], cdx_client: CdxApiClient, when: Optional[datetime.datetime] +) -> List[WebResource]: """ This is the lazy version that just does a CDX lookup for each resource. @@ -138,12 +152,13 @@ def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient, full = [] closest = when and datetime_to_cdx(when) for resource in resources: - cdx_row = cdx_client.lookup_best(resource['url'], closest=closest) + cdx_row = cdx_client.lookup_best(resource["url"], closest=closest) if not cdx_row: raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}") - if cdx_row.url != resource['url'] and not url_fuzzy_equal(cdx_row.url, resource['url']): - print(f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}", - file=sys.stderr) + if cdx_row.url != resource["url"] and not url_fuzzy_equal(cdx_row.url, resource["url"]): + print( + f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}", file=sys.stderr + ) if not cdx_row.status_code: # TODO: fall back to a full fetch? print(" WARN: skipping revisit record", file=sys.stderr) @@ -158,14 +173,16 @@ def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient, status_code=cdx_row.status_code, size=None, sha256hex=None, - resource_type=resource['type'], - )) + resource_type=resource["type"], + ) + ) return full -def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient, - when: Optional[datetime.datetime]) -> List[WebResource]: +def fetch_html_resources( + resources: List[dict], wayback_client: WaybackClient, when: Optional[datetime.datetime] +) -> List[WebResource]: """ This is the full version which fetches each resource from wayback/petabox and calculates additional hashes. @@ -176,11 +193,11 @@ def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient, full = [] closest = when and datetime_to_cdx(when) for resource in resources: - wayback_resp = wayback_client.lookup_resource(resource['url'], closest=closest) - if not wayback_resp or wayback_resp.status != 'success': + wayback_resp = wayback_client.lookup_resource(resource["url"], closest=closest) + if not wayback_resp or wayback_resp.status != "success": raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}") file_meta = gen_file_metadata(wayback_resp.body, allow_empty=True) - if file_meta['sha1hex'] != wayback_resp.cdx.sha1hex: + if file_meta["sha1hex"] != wayback_resp.cdx.sha1hex: raise WaybackContentError( f"wayback payload sha1hex mismatch: {wayback_resp.cdx.datetime} {wayback_resp.cdx.url}" ) @@ -189,25 +206,27 @@ def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient, surt=wayback_resp.cdx.surt, timestamp=parse_cdx_datetime(wayback_resp.cdx.datetime), url=wayback_resp.cdx.url, - sha1hex=file_meta['sha1hex'], - mimetype=file_meta['mimetype'], + sha1hex=file_meta["sha1hex"], + mimetype=file_meta["mimetype"], status_code=wayback_resp.cdx.status_code or wayback_resp.revisit_cdx.status_code, - size=file_meta['size_bytes'], - sha256hex=file_meta['sha256hex'], - resource_type=resource['type'], - )) + size=file_meta["size_bytes"], + sha256hex=file_meta["sha256hex"], + resource_type=resource["type"], + ) + ) return full -def html_guess_platform(url: str, doc: HTMLParser, - biblio: Optional[BiblioMetadata]) -> Optional[str]: +def html_guess_platform( + url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata] +) -> Optional[str]: generator: Optional[str] = None generator_elem = doc.css_first("meta[name='generator']") if generator_elem: - generator = generator_elem.attrs['content'] + generator = generator_elem.attrs["content"] else: generator_elem = doc.css_first("a[id='developedBy']") if generator_elem: @@ -226,7 +245,10 @@ def html_guess_platform(url: str, doc: HTMLParser, return "ojs" else: try: - if 'powered by PKP OJS' in doc.html: + if ( + 'powered by PKP OJS' + in doc.html + ): return "ojs" if 'Powered by ' in doc.html: return "arpha" @@ -236,20 +258,21 @@ def html_guess_platform(url: str, doc: HTMLParser, pass icon_elem = doc.css_first("link[type='image/x-icon']") - if icon_elem and 'href' in icon_elem.attrs: - if 'journalssystem.com' in icon_elem.attrs['href']: + if icon_elem and "href" in icon_elem.attrs: + if "journalssystem.com" in icon_elem.attrs["href"]: return "journalssystem.com" - elif 'indexcopernicus.com' in icon_elem.attrs['href']: + elif "indexcopernicus.com" in icon_elem.attrs["href"]: return "indexcopernicus" - if 'scielo' in url: + if "scielo" in url: return "scielo" return None -def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata], - word_count: Optional[int]) -> str: +def html_guess_scope( + url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata], word_count: Optional[int] +) -> str: """ This function tries to guess if an HTML document represents one of: @@ -275,7 +298,7 @@ def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata] """ # assert that this is a real URL - assert url.count('/') >= 2 + assert url.count("/") >= 2 # basic paywall and loginwall detection based on URL if url.endswith("/cookieAbsent"): @@ -293,7 +316,7 @@ def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata] return "blocked-captcha" # is this the top-level URL of the domain? aka, no path? - if url.count('/') <= 2 or (url.count('/') == 3) and url.endswith('/'): + if url.count("/") <= 2 or (url.count("/") == 3) and url.endswith("/"): return "homepage-domain" platform = html_guess_platform(url, doc, biblio) @@ -340,7 +363,7 @@ def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata] if word_count is not None: if word_count < 20: return "stub" - elif word_count > 500 and platform in ['wordpress', 'blogger']: + elif word_count > 500 and platform in ["wordpress", "blogger"]: return "article-fulltext" elif word_count > 1200: return "article-fulltext" @@ -348,9 +371,9 @@ def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata] return "unknown" -def run_single(url: str, - timestamp: Optional[str] = None, - quick_mode: bool = False) -> IngestWebResult: +def run_single( + url: str, timestamp: Optional[str] = None, quick_mode: bool = False +) -> IngestWebResult: adblock = load_adblock_rules() wayback_client = WaybackClient() @@ -368,7 +391,7 @@ def run_single(url: str, file_meta = gen_file_metadata(html_resource.body) file_meta, html_resource = fix_transfer_encoding(file_meta, html_resource) - if file_meta['mimetype'] not in ("text/html", "text/xml"): + if file_meta["mimetype"] not in ("text/html", "text/xml"): return IngestWebResult( status="wrong-mimetype", hit=False, @@ -379,8 +402,8 @@ def run_single(url: str, html_doc = HTMLParser(html_resource.body) html_biblio = html_extract_biblio(url, html_doc) html_body = html_extract_body_teixml(html_resource.body) - html_scope = html_guess_scope(url, html_doc, html_biblio, html_body.get('word_count')) - if html_scope not in ('article-fulltext', 'unknown'): + html_scope = html_guess_scope(url, html_doc, html_biblio, html_body.get("word_count")) + if html_scope not in ("article-fulltext", "unknown"): return IngestWebResult( status="wrong-scope", hit=False, @@ -397,8 +420,9 @@ def run_single(url: str, full_resources: List[WebResource] = [] if quick_mode: - full_resources = quick_fetch_html_resources(raw_resources, wayback_client.cdx_client, - when) + full_resources = quick_fetch_html_resources( + raw_resources, wayback_client.cdx_client, when + ) else: full_resources = fetch_html_resources(raw_resources, wayback_client, when) @@ -425,8 +449,9 @@ def main() -> None: parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) subparsers = parser.add_subparsers() - sub = subparsers.add_parser("single", - help="tries to ingest a single URL, dumps result to stdout") + sub = subparsers.add_parser( + "single", help="tries to ingest a single URL, dumps result to stdout" + ) sub.set_defaults(func="run_single") sub.add_argument( "url", @@ -453,8 +478,8 @@ def main() -> None: result = run_single(args.url, args.timestamp, args.quick_mode) print(result.json(indent=2, exclude_none=True)) else: - #func = getattr(wp, args.func) - #func() + # func = getattr(wp, args.func) + # func() raise NotImplementedError() diff --git a/python/sandcrawler/minio.py b/python/sandcrawler/minio.py index 1967ba3..d47ab89 100644 --- a/python/sandcrawler/minio.py +++ b/python/sandcrawler/minio.py @@ -6,11 +6,13 @@ import minio class SandcrawlerMinioClient(object): - def __init__(self, - host_url: str, - access_key: str, - secret_key: str, - default_bucket: Optional[str] = None): + def __init__( + self, + host_url: str, + access_key: str, + secret_key: str, + default_bucket: Optional[str] = None, + ): """ host is minio connection string (host:port) access and secret key are as expected @@ -46,13 +48,15 @@ class SandcrawlerMinioClient(object): ) return obj_path - def put_blob(self, - folder: str, - blob: Union[str, bytes], - sha1hex: Optional[str] = None, - extension: str = "", - prefix: str = "", - bucket: Optional[str] = None) -> Tuple[str, str]: + def put_blob( + self, + folder: str, + blob: Union[str, bytes], + sha1hex: Optional[str] = None, + extension: str = "", + prefix: str = "", + bucket: Optional[str] = None, + ) -> Tuple[str, str]: """ blob should be bytes sha1hex is assumed to be sha1 of the blob itself; if not supplied it will be calculated @@ -61,7 +65,7 @@ class SandcrawlerMinioClient(object): filename is SHA1 with an optional file extension. """ if type(blob) == str: - blob = blob.encode('utf-8') + blob = blob.encode("utf-8") assert type(blob) == bytes if not sha1hex: h = hashlib.sha1() @@ -72,13 +76,13 @@ class SandcrawlerMinioClient(object): bucket = self.default_bucket assert bucket content_type = "application/octet-stream" - if extension.endswith('.xml'): + if extension.endswith(".xml"): content_type = "application/xml" - if extension.endswith('.png'): + if extension.endswith(".png"): content_type = "image/png" - elif extension.endswith('.jpg') or extension.endswith('.jpeg'): + elif extension.endswith(".jpg") or extension.endswith(".jpeg"): content_type = "image/jpeg" - elif extension.endswith('.txt'): + elif extension.endswith(".txt"): content_type = "text/plain" self.mc.put_object( bucket, @@ -89,12 +93,14 @@ class SandcrawlerMinioClient(object): ) return (bucket, obj_path) - def get_blob(self, - folder: str, - sha1hex: str, - extension: str = "", - prefix: str = "", - bucket: str = None) -> bytes: + def get_blob( + self, + folder: str, + sha1hex: str, + extension: str = "", + prefix: str = "", + bucket: str = None, + ) -> bytes: """ sha1hex is sha1 of the blob itself diff --git a/python/sandcrawler/misc.py b/python/sandcrawler/misc.py index 1c779ce..db001dd 100644 --- a/python/sandcrawler/misc.py +++ b/python/sandcrawler/misc.py @@ -15,7 +15,7 @@ def clean_url(s: str) -> str: s = s.strip() parsed = urlcanon.parse_url(s) if not parsed.port and parsed.colon_before_port: - parsed.colon_before_port = b'' + parsed.colon_before_port = b"" return str(urlcanon.whatwg(parsed)) @@ -23,10 +23,12 @@ def url_fuzzy_equal(left: str, right: str) -> bool: """ TODO: use proper surt library and canonicalization for this check """ - fuzzy_left = '://'.join( - clean_url(left).replace('www.', '').replace(':80/', '/').split('://')[1:]) - fuzzy_right = '://'.join( - clean_url(right).replace('www.', '').replace(':80/', '/').split('://')[1:]) + fuzzy_left = "://".join( + clean_url(left).replace("www.", "").replace(":80/", "/").split("://")[1:] + ) + fuzzy_right = "://".join( + clean_url(right).replace("www.", "").replace(":80/", "/").split("://")[1:] + ) if fuzzy_left == fuzzy_right: return True elif fuzzy_left == fuzzy_right + "/" or fuzzy_right == fuzzy_left + "/": @@ -35,10 +37,13 @@ def url_fuzzy_equal(left: str, right: str) -> bool: def test_url_fuzzy_equal() -> None: - assert url_fuzzy_equal( - "http://www.annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree", - "http://annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree" - ) is True + assert ( + url_fuzzy_equal( + "http://www.annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree", + "http://annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree", + ) + is True + ) def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict: @@ -53,7 +58,7 @@ def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict: if len(blob) < 1024 * 1024: mimetype = magic.Magic(mime=True).from_buffer(blob) else: - mimetype = magic.Magic(mime=True).from_buffer(blob[:(1024 * 1024)]) + mimetype = magic.Magic(mime=True).from_buffer(blob[: (1024 * 1024)]) if mimetype in ("application/xml", "text/xml"): # crude checks for XHTML or JATS XML, using only first 1 kB of file if b" dict: assert path is not None mimetype = magic.Magic(mime=True).from_file(path) if mimetype in ("application/xml", "text/xml"): - with open(path, 'rb') as f: + with open(path, "rb") as f: blob = f.read(1024) # crude checks for XHTML or JATS XML, using only first 1 kB of file - if b" dict: hashlib.md5(), ] size_bytes = 0 - with open(path, 'rb') as f: + with open(path, "rb") as f: while True: chunk = f.read(1024 * 1024) if not chunk: @@ -128,15 +136,15 @@ def b32_hex(s: str) -> str: if len(s) == 40: return s raise ValueError("not a base-32 encoded SHA-1 hash: {}".format(s)) - return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') + return base64.b16encode(base64.b32decode(s.upper())).lower().decode("utf-8") NORMAL_MIME = ( - 'application/pdf', - 'application/postscript', - 'text/html', - 'text/xml', - 'application/octet-stream', + "application/pdf", + "application/postscript", + "text/html", + "text/xml", + "application/octet-stream", ) @@ -147,22 +155,22 @@ def normalize_mime(raw: str) -> Optional[str]: return norm # Special cases - if raw.startswith('application/xml'): - return 'text/xml' - if raw.startswith('application/x-pdf'): - return 'application/pdf' - if raw in ('.pdf', ): - return 'application/pdf' + if raw.startswith("application/xml"): + return "text/xml" + if raw.startswith("application/x-pdf"): + return "application/pdf" + if raw in (".pdf",): + return "application/pdf" if raw in ( - 'application/download', - 'binary/octet-stream', - 'unk', - 'application/x-download', - 'application/octetstream', - 'application/force-download', - 'application/unknown', + "application/download", + "binary/octet-stream", + "unk", + "application/x-download", + "application/octetstream", + "application/force-download", + "application/unknown", ): - return 'application/octet-stream' + return "application/octet-stream" return None @@ -200,14 +208,19 @@ def parse_cdx_line(raw_cdx: str, normalize: bool = True) -> Optional[dict]: offset = cdx[9] warc = cdx[10] - if not (sha1b32.isalnum() and c_size.isdigit() and offset.isdigit() and len(sha1b32) == 32 - and dt.isdigit()): + if not ( + sha1b32.isalnum() + and c_size.isdigit() + and offset.isdigit() + and len(sha1b32) == 32 + and dt.isdigit() + ): return None - if '-' in (surt, dt, url, http_status, sha1b32, c_size, offset, warc): + if "-" in (surt, dt, url, http_status, sha1b32, c_size, offset, warc): return None - if mime is None or mime == '-': + if mime is None or mime == "-": mime = "application/octet-stream" if normalize: @@ -242,16 +255,13 @@ def test_parse_cdx_datetime() -> None: assert parse_cdx_datetime("") is None assert parse_cdx_datetime("asdf") is None assert parse_cdx_datetime("19930203123045") is not None - assert parse_cdx_datetime("20201028235103") == datetime.datetime(year=2020, - month=10, - day=28, - hour=23, - minute=51, - second=3) + assert parse_cdx_datetime("20201028235103") == datetime.datetime( + year=2020, month=10, day=28, hour=23, minute=51, second=3 + ) def datetime_to_cdx(dt: datetime.datetime) -> str: - return '%04d%02d%02d%02d%02d%02d' % ( + return "%04d%02d%02d%02d%02d%02d" % ( dt.year, dt.month, dt.day, @@ -263,13 +273,16 @@ def datetime_to_cdx(dt: datetime.datetime) -> str: def test_datetime_to_cdx() -> None: assert "20201028235103" == datetime_to_cdx( - datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3)) + datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3) + ) -def requests_retry_session(retries: int = 10, - backoff_factor: int = 3, - status_forcelist: List[int] = [500, 502, 504], - session: requests.Session = None) -> requests.Session: +def requests_retry_session( + retries: int = 10, + backoff_factor: int = 3, + status_forcelist: List[int] = [500, 502, 504], + session: requests.Session = None, +) -> requests.Session: """ From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests """ @@ -282,8 +295,8 @@ def requests_retry_session(retries: int = 10, status_forcelist=status_forcelist, ) adapter = HTTPAdapter(max_retries=retry) - session.mount('http://', adapter) - session.mount('https://', adapter) + session.mount("http://", adapter) + session.mount("https://", adapter) return session diff --git a/python/sandcrawler/pdfextract.py b/python/sandcrawler/pdfextract.py index 1d306d3..6c18395 100644 --- a/python/sandcrawler/pdfextract.py +++ b/python/sandcrawler/pdfextract.py @@ -173,64 +173,69 @@ class PdfExtractResult: Outputs a JSON string as would be published to Kafka text/info topic. """ return { - 'key': self.sha1hex, - 'sha1hex': self.sha1hex, - 'status': self.status, - 'file_meta': self.file_meta, - 'error_msg': self.error_msg, - 'text': self.text, - 'has_page0_thumbnail': self.has_page0_thumbnail, - 'meta_xml': self.meta_xml, - 'pdf_info': self.pdf_info, - 'pdf_extra': self.pdf_extra, - 'source': self.source, + "key": self.sha1hex, + "sha1hex": self.sha1hex, + "status": self.status, + "file_meta": self.file_meta, + "error_msg": self.error_msg, + "text": self.text, + "has_page0_thumbnail": self.has_page0_thumbnail, + "meta_xml": self.meta_xml, + "pdf_info": self.pdf_info, + "pdf_extra": self.pdf_extra, + "source": self.source, } @staticmethod - def from_pdftext_dict(record: Dict[str, Any]) -> 'PdfExtractResult': + def from_pdftext_dict(record: Dict[str, Any]) -> "PdfExtractResult": """ Outputs a JSON string as would be published to Kafka text/info topic. """ - if record['status'] != 'success': + if record["status"] != "success": return PdfExtractResult( - sha1hex=record.get('sha1hex') or record['key'], - status=record['status'], - error_msg=record.get('error_msg'), + sha1hex=record.get("sha1hex") or record["key"], + status=record["status"], + error_msg=record.get("error_msg"), ) else: return PdfExtractResult( - sha1hex=record['sha1hex'], - status=record['status'], - file_meta=record.get('file_meta'), - text=record.get('text'), - has_page0_thumbnail=bool(record.get('has_page0_thumbnail', False)), - meta_xml=record.get('meta_xml'), - pdf_info=record.get('pdf_info'), - pdf_extra=record.get('pdf_extra'), + sha1hex=record["sha1hex"], + status=record["status"], + file_meta=record.get("file_meta"), + text=record.get("text"), + has_page0_thumbnail=bool(record.get("has_page0_thumbnail", False)), + meta_xml=record.get("meta_xml"), + pdf_info=record.get("pdf_info"), + pdf_extra=record.get("pdf_extra"), ) @staticmethod - def from_pdf_meta_dict(record: Dict[str, Any]) -> 'PdfExtractResult': + def from_pdf_meta_dict(record: Dict[str, Any]) -> "PdfExtractResult": """ Parses what would be returned from postgrest """ - if record['status'] != 'success': + if record["status"] != "success": return PdfExtractResult( - sha1hex=record['sha1hex'], - status=record['status'], - error_msg=(record.get('metadata') or {}).get('error_msg'), + sha1hex=record["sha1hex"], + status=record["status"], + error_msg=(record.get("metadata") or {}).get("error_msg"), ) else: pdf_extra = dict() - for k in ('page_count', 'page0_height', 'page0_width', 'permanent_id', - 'pdf_version'): + for k in ( + "page_count", + "page0_height", + "page0_width", + "permanent_id", + "pdf_version", + ): if record.get(k): pdf_extra[k] = record[k] return PdfExtractResult( - sha1hex=record['sha1hex'], - status=record['status'], - has_page0_thumbnail=bool(record.get('has_page0_thumbnail', False)), - pdf_info=record.get('metadata'), + sha1hex=record["sha1hex"], + status=record["status"], + has_page0_thumbnail=bool(record.get("has_page0_thumbnail", False)), + pdf_info=record.get("metadata"), pdf_extra=pdf_extra, ) @@ -247,11 +252,11 @@ class PdfExtractResult: # TODO: form, encrypted if self.pdf_info: metadata = dict() - for k in ('Title', 'Subject', 'Author', 'Creator', 'Producer', 'doi'): + for k in ("Title", "Subject", "Author", "Creator", "Producer", "doi"): if k in self.pdf_info: metadata[k.lower()] = self.pdf_info[k] - if 'CreationDate' in self.pdf_info: - pdf_created = self.pdf_info['CreationDate'] + if "CreationDate" in self.pdf_info: + pdf_created = self.pdf_info["CreationDate"] metadata_json: Optional[str] = None if metadata: metadata_json = json.dumps(metadata, sort_keys=True) @@ -260,20 +265,20 @@ class PdfExtractResult: datetime.datetime.now(), # updated self.status, self.has_page0_thumbnail, - pdf_extra.get('page_count'), + pdf_extra.get("page_count"), word_count, - pdf_extra.get('page0_height'), - pdf_extra.get('page0_width'), - pdf_extra.get('permanent_id'), + pdf_extra.get("page0_height"), + pdf_extra.get("page0_width"), + pdf_extra.get("permanent_id"), pdf_created, - pdf_extra.get('pdf_version'), + pdf_extra.get("pdf_version"), metadata_json, ) -def process_pdf(blob: bytes, - thumb_size: Tuple[int, int] = (180, 300), - thumb_type: str = "JPEG") -> PdfExtractResult: +def process_pdf( + blob: bytes, thumb_size: Tuple[int, int] = (180, 300), thumb_type: str = "JPEG" +) -> PdfExtractResult: """ A known issue is that output text is in "physical layout" mode, which means columns will be side-by-side. We would prefer a single stream of tokens! @@ -283,11 +288,11 @@ def process_pdf(blob: bytes, didn't seem to work at all (returned empty strings). """ file_meta = gen_file_metadata(blob) - sha1hex = file_meta['sha1hex'] - if file_meta['mimetype'] != 'application/pdf': + sha1hex = file_meta["sha1hex"] + if file_meta["mimetype"] != "application/pdf": return PdfExtractResult( sha1hex=sha1hex, - status='not-pdf', + status="not-pdf", error_msg=f"mimetype is '{file_meta['mimetype']}'", file_meta=file_meta, ) @@ -295,7 +300,7 @@ def process_pdf(blob: bytes, if sha1hex in BAD_PDF_SHA1HEX: return PdfExtractResult( sha1hex=sha1hex, - status='bad-pdf', + status="bad-pdf", error_msg="PDF known to cause processing issues", file_meta=file_meta, ) @@ -306,7 +311,7 @@ def process_pdf(blob: bytes, if pdf is None: return PdfExtractResult( sha1hex=sha1hex, - status='empty-pdf', + status="empty-pdf", file_meta=file_meta, has_page0_thumbnail=False, ) @@ -314,7 +319,7 @@ def process_pdf(blob: bytes, if page0 is None: return PdfExtractResult( sha1hex=sha1hex, - status='empty-page0', + status="empty-page0", file_meta=file_meta, ) # this call sometimes fails an returns an AttributeError @@ -324,7 +329,7 @@ def process_pdf(blob: bytes, # starting with a narrow set return PdfExtractResult( sha1hex=sha1hex, - status='parse-error', + status="parse-error", error_msg=str(e), file_meta=file_meta, ) @@ -334,8 +339,9 @@ def process_pdf(blob: bytes, renderer = poppler.PageRenderer() try: full_img = renderer.render_page(page0) - img = Image.frombuffer("RGBA", (full_img.width, full_img.height), full_img.data, 'raw', - "BGRA", 0, 1) + img = Image.frombuffer( + "RGBA", (full_img.width, full_img.height), full_img.data, "raw", "BGRA", 0, 1 + ) img.thumbnail(thumb_size, Image.BICUBIC) buf = BytesIO() img.save(buf, thumb_type) @@ -355,7 +361,7 @@ def process_pdf(blob: bytes, except AttributeError as e: return PdfExtractResult( sha1hex=sha1hex, - status='parse-error', + status="parse-error", error_msg=str(e), file_meta=file_meta, ) @@ -364,14 +370,14 @@ def process_pdf(blob: bytes, if len(full_text) > 1000000: return PdfExtractResult( sha1hex=sha1hex, - status='text-too-large', + status="text-too-large", error_msg="full_text chars: {}".format(len(full_text)), file_meta=file_meta, ) if len(pdf.metadata) > 1000000: return PdfExtractResult( sha1hex=sha1hex, - status='text-too-large', + status="text-too-large", error_msg="meta_xml chars: {}".format(len(full_text)), file_meta=file_meta, ) @@ -381,7 +387,7 @@ def process_pdf(blob: bytes, except UnicodeDecodeError: return PdfExtractResult( sha1hex=sha1hex, - status='bad-unicode', + status="bad-unicode", error_msg="in infos()", file_meta=file_meta, ) @@ -402,7 +408,7 @@ def process_pdf(blob: bytes, return PdfExtractResult( sha1hex=sha1hex, file_meta=file_meta, - status='success', + status="success", error_msg=None, text=full_text or None, has_page0_thumbnail=page0_thumbnail is not None, @@ -421,17 +427,19 @@ def process_pdf(blob: bytes, class PdfExtractWorker(SandcrawlerFetchWorker): - def __init__(self, - wayback_client: Optional[WaybackClient] = None, - sink: Optional[SandcrawlerWorker] = None, - **kwargs): + def __init__( + self, + wayback_client: Optional[WaybackClient] = None, + sink: Optional[SandcrawlerWorker] = None, + **kwargs, + ): super().__init__(wayback_client=wayback_client) self.wayback_client = wayback_client self.sink = sink - self.thumbnail_sink = kwargs.get('thumbnail_sink') + self.thumbnail_sink = kwargs.get("thumbnail_sink") def timeout_response(self, task: Dict[str, Any]) -> Dict[str, Any]: - default_key = task['sha1hex'] + default_key = task["sha1hex"] return dict( status="error-timeout", error_msg="internal pdf-extract worker timeout", @@ -441,9 +449,9 @@ class PdfExtractWorker(SandcrawlerFetchWorker): def process(self, record: Any, key: Optional[str] = None) -> dict: fetch_result = self.fetch_blob(record) - if fetch_result['status'] != 'success': + if fetch_result["status"] != "success": return fetch_result - blob: bytes = fetch_result['blob'] + blob: bytes = fetch_result["blob"] assert blob and isinstance(blob, bytes) result = process_pdf(blob) @@ -458,10 +466,11 @@ class PdfExtractBlobWorker(SandcrawlerWorker): This is sort of like PdfExtractWorker, except it receives blobs directly, instead of fetching blobs from some remote store. """ + def __init__(self, sink: Optional[SandcrawlerWorker] = None, **kwargs): super().__init__() self.sink = sink - self.thumbnail_sink = kwargs.get('thumbnail_sink') + self.thumbnail_sink = kwargs.get("thumbnail_sink") def process(self, blob: Any, key: Optional[str] = None) -> Any: if not blob: diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py index 138e65c..d765164 100644 --- a/python/sandcrawler/pdftrio.py +++ b/python/sandcrawler/pdftrio.py @@ -32,37 +32,37 @@ class PdfTrioClient(object): pdftrio_response = requests.post( self.host_url + "/classify/research-pub/" + mode, files={ - 'pdf_content': blob, + "pdf_content": blob, }, timeout=60.0, ) except requests.Timeout: return { - 'status': 'error-timeout', - 'status_code': -4, # heritrix3 "HTTP timeout" code - 'error_msg': 'pdftrio request (HTTP POST) timeout', + "status": "error-timeout", + "status_code": -4, # heritrix3 "HTTP timeout" code + "error_msg": "pdftrio request (HTTP POST) timeout", } except requests.exceptions.ConnectionError: # crude back-off time.sleep(2.0) return { - 'status': 'error-connect', - 'status_code': -2, # heritrix3 "HTTP connect" code - 'error_msg': 'pdftrio request connection timout', + "status": "error-connect", + "status_code": -2, # heritrix3 "HTTP connect" code + "error_msg": "pdftrio request connection timout", } info: Dict[str, Any] = dict(status_code=pdftrio_response.status_code) if pdftrio_response.status_code == 200: resp_json = pdftrio_response.json() - assert 'ensemble_score' in resp_json - assert 'status' in resp_json - assert 'versions' in resp_json + assert "ensemble_score" in resp_json + assert "status" in resp_json + assert "versions" in resp_json info.update(resp_json) else: - info['status'] = 'error' + info["status"] = "error" # TODO: might return JSON with some info? - info['_total_sec'] = pdftrio_response.elapsed.total_seconds() + info["_total_sec"] = pdftrio_response.elapsed.total_seconds() return info @@ -70,11 +70,14 @@ class PdfTrioWorker(SandcrawlerFetchWorker): """ This class is basically copied directly from GrobidWorker """ - def __init__(self, - pdftrio_client: PdfTrioClient, - wayback_client: Optional[WaybackClient] = None, - sink: Optional[SandcrawlerWorker] = None, - **kwargs): + + def __init__( + self, + pdftrio_client: PdfTrioClient, + wayback_client: Optional[WaybackClient] = None, + sink: Optional[SandcrawlerWorker] = None, + **kwargs + ): super().__init__(wayback_client=wayback_client, **kwargs) self.pdftrio_client = pdftrio_client self.sink = sink @@ -86,22 +89,22 @@ class PdfTrioWorker(SandcrawlerFetchWorker): start = time.time() fetch_result = self.fetch_blob(record) fetch_sec = time.time() - start - if fetch_result['status'] != 'success': + if fetch_result["status"] != "success": return fetch_result - blob: bytes = fetch_result['blob'] + blob: bytes = fetch_result["blob"] assert blob and isinstance(blob, bytes) result = dict() - result['file_meta'] = gen_file_metadata(blob) - result['key'] = result['file_meta']['sha1hex'] - result['pdf_trio'] = self.pdftrio_client.classify_pdf(blob) - result['source'] = record - result['timing'] = dict( - pdftrio_sec=result['pdf_trio'].pop('_total_sec', None), + result["file_meta"] = gen_file_metadata(blob) + result["key"] = result["file_meta"]["sha1hex"] + result["pdf_trio"] = self.pdftrio_client.classify_pdf(blob) + result["source"] = record + result["timing"] = dict( + pdftrio_sec=result["pdf_trio"].pop("_total_sec", None), total_sec=time.time() - start_process, ) if fetch_sec: - result['timing']['fetch_sec'] = fetch_sec + result["timing"]["fetch_sec"] = fetch_sec return result @@ -110,11 +113,14 @@ class PdfTrioBlobWorker(SandcrawlerWorker): This is sort of like PdfTrioWorker, except it receives blobs directly, instead of fetching blobs from some remote store. """ - def __init__(self, - pdftrio_client: PdfTrioClient, - sink: Optional[SandcrawlerWorker] = None, - mode: str = "auto", - **kwargs): + + def __init__( + self, + pdftrio_client: PdfTrioClient, + sink: Optional[SandcrawlerWorker] = None, + mode: str = "auto", + **kwargs + ): super().__init__(**kwargs) self.pdftrio_client = pdftrio_client self.sink = sink @@ -126,11 +132,11 @@ class PdfTrioBlobWorker(SandcrawlerWorker): return None assert isinstance(blob, bytes) result = dict() - result['file_meta'] = gen_file_metadata(blob) - result['key'] = result['file_meta']['sha1hex'] - result['pdf_trio'] = self.pdftrio_client.classify_pdf(blob, mode=self.mode) - result['timing'] = dict( - pdftrio_sec=result['pdf_trio'].pop('_total_sec', None), + result["file_meta"] = gen_file_metadata(blob) + result["key"] = result["file_meta"]["sha1hex"] + result["pdf_trio"] = self.pdftrio_client.classify_pdf(blob, mode=self.mode) + result["timing"] = dict( + pdftrio_sec=result["pdf_trio"].pop("_total_sec", None), total_sec=time.time() - start_process, ) return result diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 8ec5979..f7954b1 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -41,14 +41,14 @@ class PersistCdxWorker(SandcrawlerWorker): raise NotImplementedError def push_batch(self, batch: list) -> list: - self.counts['total'] += len(batch) + self.counts["total"] += len(batch) # filter to full CDX lines, no liveweb - cdx_batch = [r for r in batch if r.get('warc_path') and ("/" in r['warc_path'])] + cdx_batch = [r for r in batch if r.get("warc_path") and ("/" in r["warc_path"])] resp = self.db.insert_cdx(self.cur, cdx_batch) if len(cdx_batch) < len(batch): - self.counts['skip'] += len(batch) - len(cdx_batch) - self.counts['insert-cdx'] += resp[0] - self.counts['update-cdx'] += resp[1] + self.counts["skip"] += len(batch) - len(cdx_batch) + self.counts["insert-cdx"] += resp[0] + self.counts["update-cdx"] += resp[1] self.db.commit() return [] @@ -70,47 +70,52 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): if there is a problem with conversion, return None """ # backwards compat hacks; transform request to look like current schema - if raw.get('ingest_type') == 'file': - raw['ingest_type'] = 'pdf' - if (not raw.get('link_source') and raw.get('base_url') - and raw.get('ext_ids', {}).get('doi') - and raw['base_url'] == "https://doi.org/{}".format(raw['ext_ids']['doi'])): + if raw.get("ingest_type") == "file": + raw["ingest_type"] = "pdf" + if ( + not raw.get("link_source") + and raw.get("base_url") + and raw.get("ext_ids", {}).get("doi") + and raw["base_url"] == "https://doi.org/{}".format(raw["ext_ids"]["doi"]) + ): # set link_source(_id) for old ingest requests - raw['link_source'] = 'doi' - raw['link_source_id'] = raw['ext_ids']['doi'] - if (not raw.get('link_source') - and raw.get('ingest_request_source', '').startswith('savepapernow') - and raw.get('fatcat', {}).get('release_ident')): + raw["link_source"] = "doi" + raw["link_source_id"] = raw["ext_ids"]["doi"] + if ( + not raw.get("link_source") + and raw.get("ingest_request_source", "").startswith("savepapernow") + and raw.get("fatcat", {}).get("release_ident") + ): # set link_source(_id) for old ingest requests - raw['link_source'] = 'spn' - raw['link_source_id'] = raw['fatcat']['release_ident'] + raw["link_source"] = "spn" + raw["link_source_id"] = raw["fatcat"]["release_ident"] - for k in ('ingest_type', 'base_url', 'link_source', 'link_source_id'): + for k in ("ingest_type", "base_url", "link_source", "link_source_id"): if k not in raw: - self.counts['skip-request-fields'] += 1 + self.counts["skip-request-fields"] += 1 return None - if raw['ingest_type'] not in ('pdf', 'xml', 'html'): - self.counts['skip-ingest-type'] += 1 + if raw["ingest_type"] not in ("pdf", "xml", "html"): + self.counts["skip-ingest-type"] += 1 return None request = { - 'ingest_type': raw['ingest_type'], - 'base_url': raw['base_url'], - 'link_source': raw['link_source'], - 'link_source_id': raw['link_source_id'], - 'ingest_request_source': raw.get('ingest_request_source'), - 'request': {}, + "ingest_type": raw["ingest_type"], + "base_url": raw["base_url"], + "link_source": raw["link_source"], + "link_source_id": raw["link_source_id"], + "ingest_request_source": raw.get("ingest_request_source"), + "request": {}, } # extra/optional fields - if raw.get('release_stage'): - request['release_stage'] = raw['release_stage'] - if raw.get('fatcat', {}).get('release_ident'): - request['request']['release_ident'] = raw['fatcat']['release_ident'] - for k in ('ext_ids', 'edit_extra', 'rel'): + if raw.get("release_stage"): + request["release_stage"] = raw["release_stage"] + if raw.get("fatcat", {}).get("release_ident"): + request["request"]["release_ident"] = raw["fatcat"]["release_ident"] + for k in ("ext_ids", "edit_extra", "rel"): if raw.get(k): - request['request'][k] = raw[k] + request["request"][k] = raw[k] # if this dict is empty, trim it to save DB space - if not request['request']: - request['request'] = None + if not request["request"]: + request["request"] = None return request def file_result_to_row(self, raw: dict) -> Optional[dict]: @@ -119,59 +124,68 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): if there is a problem with conversion, return None and set skip count """ - for k in ('request', 'hit', 'status'): + for k in ("request", "hit", "status"): if k not in raw: - self.counts['skip-result-fields'] += 1 + self.counts["skip-result-fields"] += 1 return None - if 'base_url' not in raw['request']: - self.counts['skip-result-fields'] += 1 + if "base_url" not in raw["request"]: + self.counts["skip-result-fields"] += 1 return None - ingest_type = raw['request'].get('ingest_type') - if ingest_type == 'file': - ingest_type = 'pdf' - if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset', - 'dataset-file'): - self.counts['skip-ingest-type'] += 1 + ingest_type = raw["request"].get("ingest_type") + if ingest_type == "file": + ingest_type = "pdf" + if ingest_type not in ( + "pdf", + "xml", + "html", + "component", + "src", + "dataset", + "dataset-file", + ): + self.counts["skip-ingest-type"] += 1 return None - if raw['status'] in ("existing", ): - self.counts['skip-existing'] += 1 + if raw["status"] in ("existing",): + self.counts["skip-existing"] += 1 return None result = { - 'ingest_type': ingest_type, - 'base_url': raw['request']['base_url'], - 'hit': raw['hit'], - 'status': raw['status'], + "ingest_type": ingest_type, + "base_url": raw["request"]["base_url"], + "hit": raw["hit"], + "status": raw["status"], } - terminal = raw.get('terminal') + terminal = raw.get("terminal") if terminal: - result['terminal_url'] = terminal.get('terminal_url') or terminal.get('url') - result['terminal_dt'] = terminal.get('terminal_dt') - result['terminal_status_code'] = terminal.get( - 'terminal_status_code') or terminal.get('status_code') or terminal.get( - 'http_code') - if result['terminal_status_code']: - result['terminal_status_code'] = int(result['terminal_status_code']) - result['terminal_sha1hex'] = terminal.get('terminal_sha1hex') - if len(result['terminal_url']) > 2048: + result["terminal_url"] = terminal.get("terminal_url") or terminal.get("url") + result["terminal_dt"] = terminal.get("terminal_dt") + result["terminal_status_code"] = ( + terminal.get("terminal_status_code") + or terminal.get("status_code") + or terminal.get("http_code") + ) + if result["terminal_status_code"]: + result["terminal_status_code"] = int(result["terminal_status_code"]) + result["terminal_sha1hex"] = terminal.get("terminal_sha1hex") + if len(result["terminal_url"]) > 2048: # postgresql13 doesn't like extremely large URLs in b-tree index - self.counts['skip-huge-url'] += 1 + self.counts["skip-huge-url"] += 1 return None return result def result_to_html_meta(self, record: dict) -> Optional[HtmlMetaRow]: - html_body = record.get('html_body') - file_meta = record.get('file_meta') + html_body = record.get("html_body") + file_meta = record.get("file_meta") if not (file_meta and html_body): return None return HtmlMetaRow( sha1hex=file_meta["sha1hex"], - status=record.get('status'), - scope=record.get('scope'), - has_teixml=bool(html_body and html_body['status'] == 'success'), + status=record.get("status"), + scope=record.get("scope"), + has_teixml=bool(html_body and html_body["status"] == "success"), has_thumbnail=False, # TODO - word_count=(html_body and html_body.get('word_count')) or None, - biblio=record.get('html_biblio'), - resources=record.get('html_resources'), + word_count=(html_body and html_body.get("word_count")) or None, + biblio=record.get("html_biblio"), + resources=record.get("html_resources"), ) def result_to_platform_row(self, raw: dict) -> Optional[dict]: @@ -180,46 +194,49 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): if there is a problem with conversion, return None and set skip count """ - for k in ('request', 'hit', 'status'): + for k in ("request", "hit", "status"): if k not in raw: return None - if 'base_url' not in raw['request']: + if "base_url" not in raw["request"]: return None - ingest_type = raw['request'].get('ingest_type') - if ingest_type not in ('dataset'): + ingest_type = raw["request"].get("ingest_type") + if ingest_type not in ("dataset"): return None - if raw['status'] in ("existing", ): + if raw["status"] in ("existing",): return None - if not raw.get('platform_name'): + if not raw.get("platform_name"): return None result = { - 'ingest_type': ingest_type, - 'base_url': raw['request']['base_url'], - 'hit': raw['hit'], - 'status': raw['status'], - 'platform_name': raw.get('platform_name'), - 'platform_domain': raw.get('platform_domain'), - 'platform_id': raw.get('platform_id'), - 'ingest_strategy': raw.get('ingest_strategy'), - 'total_size': raw.get('total_size'), - 'file_count': raw.get('file_count'), - 'archiveorg_item_name': raw.get('archiveorg_item_name'), - 'archiveorg_item_bundle_path': None, - 'web_bundle_url': None, - 'web_bundle_dt': None, - 'manifest': raw.get('manifest'), + "ingest_type": ingest_type, + "base_url": raw["request"]["base_url"], + "hit": raw["hit"], + "status": raw["status"], + "platform_name": raw.get("platform_name"), + "platform_domain": raw.get("platform_domain"), + "platform_id": raw.get("platform_id"), + "ingest_strategy": raw.get("ingest_strategy"), + "total_size": raw.get("total_size"), + "file_count": raw.get("file_count"), + "archiveorg_item_name": raw.get("archiveorg_item_name"), + "archiveorg_item_bundle_path": None, + "web_bundle_url": None, + "web_bundle_dt": None, + "manifest": raw.get("manifest"), } - if result.get('fileset_bundle'): - result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get( - 'archiveorg_item_bundle_path') - result['web_bundle_url'] = result['fileset_bundle'].get('terminal', - {}).get('terminal_url') - result['web_bundle_dt'] = result['fileset_bundle'].get('terminal', - {}).get('terminal_dt') + if result.get("fileset_bundle"): + result["archiveorg_item_bundle_path"] = result["fileset_bundle"].get( + "archiveorg_item_bundle_path" + ) + result["web_bundle_url"] = ( + result["fileset_bundle"].get("terminal", {}).get("terminal_url") + ) + result["web_bundle_dt"] = ( + result["fileset_bundle"].get("terminal", {}).get("terminal_dt") + ) return result def push_batch(self, batch: List[Any]) -> List[Any]: - self.counts['total'] += len(batch) + self.counts["total"] += len(batch) if not batch: return [] @@ -228,60 +245,62 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): results = [r for r in results_unfiltered if r] irequests_unfiltered = [ - self.request_to_row(raw['request']) for raw in batch if raw.get('request') + self.request_to_row(raw["request"]) for raw in batch if raw.get("request") ] irequests = [ - r for r in irequests_unfiltered if r and r['ingest_type'] != 'dataset-file' + r for r in irequests_unfiltered if r and r["ingest_type"] != "dataset-file" ] if irequests: resp = self.db.insert_ingest_request(self.cur, irequests) - self.counts['insert-requests'] += resp[0] - self.counts['update-requests'] += resp[1] + self.counts["insert-requests"] += resp[0] + self.counts["update-requests"] += resp[1] if results: resp = self.db.insert_ingest_file_result(self.cur, results, on_conflict="update") - self.counts['insert-results'] += resp[0] - self.counts['update-results'] += resp[1] + self.counts["insert-results"] += resp[0] + self.counts["update-results"] += resp[1] # these schemas match, so can just pass through - cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx')] + cdx_batch = [r["cdx"] for r in batch if r.get("hit") and r.get("cdx")] revisit_cdx_batch = [ - r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx') + r["revisit_cdx"] for r in batch if r.get("hit") and r.get("revisit_cdx") ] cdx_batch.extend(revisit_cdx_batch) # filter to full CDX lines, with full warc_paths (not liveweb) - cdx_batch = [r for r in cdx_batch if r.get('warc_path') and ("/" in r['warc_path'])] + cdx_batch = [r for r in cdx_batch if r.get("warc_path") and ("/" in r["warc_path"])] if cdx_batch: resp = self.db.insert_cdx(self.cur, cdx_batch) - self.counts['insert-cdx'] += resp[0] - self.counts['update-cdx'] += resp[1] + self.counts["insert-cdx"] += resp[0] + self.counts["update-cdx"] += resp[1] - file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')] + file_meta_batch = [r["file_meta"] for r in batch if r.get("hit") and r.get("file_meta")] if file_meta_batch: resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="nothing") - self.counts['insert-file_meta'] += resp[0] - self.counts['update-file_meta'] += resp[1] + self.counts["insert-file_meta"] += resp[0] + self.counts["update-file_meta"] += resp[1] html_meta_batch = [ - self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body') + self.result_to_html_meta(r) for r in batch if r.get("hit") and r.get("html_body") ] if html_meta_batch: rows = [d.to_sql_tuple() for d in html_meta_batch if d] resp = self.db.insert_html_meta(self.cur, rows, on_conflict="update") - self.counts['insert-html_meta'] += resp[0] - self.counts['update-html_meta'] += resp[1] + self.counts["insert-html_meta"] += resp[0] + self.counts["update-html_meta"] += resp[1] fileset_platform_batch_all = [ - self.result_to_platform_row(raw) for raw in batch if - raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name') + self.result_to_platform_row(raw) + for raw in batch + if raw.get("request", {}).get("ingest_type") == "dataset" + and raw.get("platform_name") ] fileset_platform_batch: List[Dict] = [p for p in fileset_platform_batch_all if p] if fileset_platform_batch: - resp = self.db.insert_ingest_fileset_platform(self.cur, - fileset_platform_batch, - on_conflict="update") - self.counts['insert-fileset_platform'] += resp[0] - self.counts['update-fileset_platform'] += resp[1] + resp = self.db.insert_ingest_fileset_platform( + self.cur, fileset_platform_batch, on_conflict="update" + ) + self.counts["insert-fileset_platform"] += resp[0] + self.counts["update-fileset_platform"] += resp[1] self.db.commit() return [] @@ -307,7 +326,7 @@ class PersistIngestRequestWorker(PersistIngestFileResultWorker): raise NotImplementedError def push_batch(self, batch: list) -> list: - self.counts['total'] += len(batch) + self.counts["total"] += len(batch) if not batch: return [] @@ -317,8 +336,8 @@ class PersistIngestRequestWorker(PersistIngestFileResultWorker): if irequests: resp = self.db.insert_ingest_request(self.cur, irequests) - self.counts['insert-requests'] += resp[0] - self.counts['update-requests'] += resp[1] + self.counts["insert-requests"] += resp[0] + self.counts["update-requests"] += resp[1] self.db.commit() return [] @@ -329,13 +348,13 @@ class PersistGrobidWorker(SandcrawlerWorker): super().__init__() self.grobid = GrobidClient() self.s3 = SandcrawlerMinioClient( - host_url=kwargs.get('s3_url', 'localhost:9000'), - access_key=kwargs['s3_access_key'], - secret_key=kwargs['s3_secret_key'], - default_bucket=kwargs['s3_bucket'], + host_url=kwargs.get("s3_url", "localhost:9000"), + access_key=kwargs["s3_access_key"], + secret_key=kwargs["s3_secret_key"], + default_bucket=kwargs["s3_bucket"], ) - self.s3_only = kwargs.get('s3_only', False) - self.db_only = kwargs.get('db_only', False) + self.s3_only = kwargs.get("s3_only", False) + self.db_only = kwargs.get("db_only", False) assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed" if not self.s3_only: self.db: Optional[SandcrawlerPostgresClient] = SandcrawlerPostgresClient(db_url) @@ -349,58 +368,58 @@ class PersistGrobidWorker(SandcrawlerWorker): raise NotImplementedError def push_batch(self, batch: list) -> list: - self.counts['total'] += len(batch) + self.counts["total"] += len(batch) # filter out bad "missing status_code" timeout rows - missing = [r for r in batch if not r.get('status_code')] + missing = [r for r in batch if not r.get("status_code")] if missing: - self.counts['skip-missing-status'] += len(missing) - batch = [r for r in batch if r.get('status_code')] + self.counts["skip-missing-status"] += len(missing) + batch = [r for r in batch if r.get("status_code")] for r in batch: - if r['status_code'] != 200 or not r.get('tei_xml'): - self.counts['s3-skip-status'] += 1 - if r.get('error_msg'): - r['metadata'] = {'error_msg': r['error_msg'][:500]} + if r["status_code"] != 200 or not r.get("tei_xml"): + self.counts["s3-skip-status"] += 1 + if r.get("error_msg"): + r["metadata"] = {"error_msg": r["error_msg"][:500]} continue - assert len(r['key']) == 40 + assert len(r["key"]) == 40 if not self.db_only: self.s3.put_blob( folder="grobid", - blob=r['tei_xml'], - sha1hex=r['key'], + blob=r["tei_xml"], + sha1hex=r["key"], extension=".tei.xml", ) - self.counts['s3-put'] += 1 + self.counts["s3-put"] += 1 # enhance with teixml2json metadata, if available try: metadata = self.grobid.metadata(r) except xml.etree.ElementTree.ParseError as xml_e: - r['status'] = 'bad-grobid-xml' - r['metadata'] = {'error_msg': str(xml_e)[:1024]} + r["status"] = "bad-grobid-xml" + r["metadata"] = {"error_msg": str(xml_e)[:1024]} continue if not metadata: continue - for k in ('fatcat_release', 'grobid_version'): + for k in ("fatcat_release", "grobid_version"): r[k] = metadata.pop(k, None) - if r.get('fatcat_release'): - r['fatcat_release'] = r['fatcat_release'].replace('release_', '') - if metadata.get('grobid_timestamp'): - r['updated'] = metadata['grobid_timestamp'] - r['metadata'] = metadata + if r.get("fatcat_release"): + r["fatcat_release"] = r["fatcat_release"].replace("release_", "") + if metadata.get("grobid_timestamp"): + r["updated"] = metadata["grobid_timestamp"] + r["metadata"] = metadata if not self.s3_only: assert self.db and self.cur resp = self.db.insert_grobid(self.cur, batch, on_conflict="update") - self.counts['insert-grobid'] += resp[0] - self.counts['update-grobid'] += resp[1] + self.counts["insert-grobid"] += resp[0] + self.counts["update-grobid"] += resp[1] - file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')] + file_meta_batch = [r["file_meta"] for r in batch if r.get("file_meta")] resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update") - self.counts['insert-file-meta'] += resp[0] - self.counts['update-file-meta'] += resp[1] + self.counts["insert-file-meta"] += resp[0] + self.counts["update-file-meta"] += resp[1] self.db.commit() @@ -413,6 +432,7 @@ class PersistGrobidDiskWorker(SandcrawlerWorker): This could be refactored into a "Sink" type with an even thinner wrapper. """ + def __init__(self, output_dir: str): super().__init__() self.output_dir = output_dir @@ -428,14 +448,14 @@ class PersistGrobidDiskWorker(SandcrawlerWorker): def process(self, record: Any, key: Optional[str] = None) -> Any: - if record.get('status_code') != 200 or not record.get('tei_xml'): + if record.get("status_code") != 200 or not record.get("tei_xml"): return False - assert (len(record['key'])) == 40 - p = "{}/{}".format(self.output_dir, self._blob_path(record['key'])) + assert (len(record["key"])) == 40 + p = "{}/{}".format(self.output_dir, self._blob_path(record["key"])) os.makedirs(os.path.dirname(p), exist_ok=True) - with open(p, 'w') as f: - f.write(record.pop('tei_xml')) - self.counts['written'] += 1 + with open(p, "w") as f: + f.write(record.pop("tei_xml")) + self.counts["written"] += 1 return record @@ -450,24 +470,25 @@ class PersistPdfTrioWorker(SandcrawlerWorker): raise NotImplementedError def push_batch(self, batch: list) -> list: - self.counts['total'] += len(batch) + self.counts["total"] += len(batch) - batch = [r for r in batch if 'pdf_trio' in r and r['pdf_trio'].get('status_code')] + batch = [r for r in batch if "pdf_trio" in r and r["pdf_trio"].get("status_code")] for r in batch: # copy key (sha1hex) into sub-object - r['pdf_trio']['key'] = r['key'] - pdftrio_batch = [r['pdf_trio'] for r in batch] + r["pdf_trio"]["key"] = r["key"] + pdftrio_batch = [r["pdf_trio"] for r in batch] resp = self.db.insert_pdftrio(self.cur, pdftrio_batch, on_conflict="update") - self.counts['insert-pdftrio'] += resp[0] - self.counts['update-pdftrio'] += resp[1] + self.counts["insert-pdftrio"] += resp[0] + self.counts["update-pdftrio"] += resp[1] file_meta_batch = [ - r['file_meta'] for r in batch - if r['pdf_trio']['status'] == "success" and r.get('file_meta') + r["file_meta"] + for r in batch + if r["pdf_trio"]["status"] == "success" and r.get("file_meta") ] resp = self.db.insert_file_meta(self.cur, file_meta_batch) - self.counts['insert-file-meta'] += resp[0] - self.counts['update-file-meta'] += resp[1] + self.counts["insert-file-meta"] += resp[0] + self.counts["update-file-meta"] += resp[1] self.db.commit() return [] @@ -479,16 +500,17 @@ class PersistPdfTextWorker(SandcrawlerWorker): Should keep batch sizes small. """ + def __init__(self, db_url: str, **kwargs): super().__init__() self.s3 = SandcrawlerMinioClient( - host_url=kwargs.get('s3_url', 'localhost:9000'), - access_key=kwargs['s3_access_key'], - secret_key=kwargs['s3_secret_key'], - default_bucket=kwargs['s3_bucket'], + host_url=kwargs.get("s3_url", "localhost:9000"), + access_key=kwargs["s3_access_key"], + secret_key=kwargs["s3_secret_key"], + default_bucket=kwargs["s3_bucket"], ) - self.s3_only = kwargs.get('s3_only', False) - self.db_only = kwargs.get('db_only', False) + self.s3_only = kwargs.get("s3_only", False) + self.db_only = kwargs.get("db_only", False) assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed" if not self.s3_only: self.db: Optional[SandcrawlerPostgresClient] = SandcrawlerPostgresClient(db_url) @@ -502,17 +524,17 @@ class PersistPdfTextWorker(SandcrawlerWorker): raise NotImplementedError def push_batch(self, batch: list) -> list: - self.counts['total'] += len(batch) + self.counts["total"] += len(batch) parsed_batch = [] for r in batch: parsed_batch.append(PdfExtractResult.from_pdftext_dict(r)) for r in parsed_batch: - if r.status != 'success' or not r.text: - self.counts['s3-skip-status'] += 1 + if r.status != "success" or not r.text: + self.counts["s3-skip-status"] += 1 if r.error_msg: - r.metadata = {'error_msg': r.error_msg[:500]} + r.metadata = {"error_msg": r.error_msg[:500]} continue assert len(r.sha1hex) == 40 @@ -523,19 +545,19 @@ class PersistPdfTextWorker(SandcrawlerWorker): sha1hex=r.sha1hex, extension=".txt", ) - self.counts['s3-put'] += 1 + self.counts["s3-put"] += 1 if not self.s3_only: assert self.db and self.cur rows = [r.to_sql_tuple() for r in parsed_batch] resp = self.db.insert_pdf_meta(self.cur, rows, on_conflict="update") - self.counts['insert-pdf-meta'] += resp[0] - self.counts['update-pdf-meta'] += resp[1] + self.counts["insert-pdf-meta"] += resp[0] + self.counts["update-pdf-meta"] += resp[1] file_meta_batch = [r.file_meta for r in parsed_batch if r.file_meta] resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update") - self.counts['insert-file-meta'] += resp[0] - self.counts['update-file-meta'] += resp[1] + self.counts["insert-file-meta"] += resp[0] + self.counts["update-file-meta"] += resp[1] self.db.commit() @@ -550,16 +572,17 @@ class PersistThumbnailWorker(SandcrawlerWorker): This worker *must* be used with raw kakfa mode; thumbnails are *not* wrapped in JSON like most sandcrawler kafka messages. """ + def __init__(self, **kwargs): super().__init__() self.s3 = SandcrawlerMinioClient( - host_url=kwargs.get('s3_url', 'localhost:9000'), - access_key=kwargs['s3_access_key'], - secret_key=kwargs['s3_secret_key'], - default_bucket=kwargs['s3_bucket'], + host_url=kwargs.get("s3_url", "localhost:9000"), + access_key=kwargs["s3_access_key"], + secret_key=kwargs["s3_secret_key"], + default_bucket=kwargs["s3_bucket"], ) - self.s3_extension = kwargs.get('s3_extension', ".jpg") - self.s3_folder = kwargs.get('s3_folder', "pdf") + self.s3_extension = kwargs.get("s3_extension", ".jpg") + self.s3_folder = kwargs.get("s3_folder", "pdf") def process(self, record: Any, key: Optional[str] = None) -> Any: """ @@ -569,7 +592,7 @@ class PersistThumbnailWorker(SandcrawlerWorker): assert isinstance(record, bytes) blob: bytes = record if isinstance(key, bytes): - key = key.decode('utf-8') + key = key.decode("utf-8") assert key is not None and len(key) == 40 and isinstance(key, str) assert len(blob) >= 50 @@ -579,7 +602,7 @@ class PersistThumbnailWorker(SandcrawlerWorker): sha1hex=key, extension=self.s3_extension, ) - self.counts['s3-put'] += 1 + self.counts["s3-put"] += 1 class GenericPersistDocWorker(SandcrawlerWorker): @@ -588,39 +611,40 @@ class GenericPersistDocWorker(SandcrawlerWorker): Objects are assumed to be JSON-wrapped strings. """ + def __init__(self, **kwargs): super().__init__() self.s3 = SandcrawlerMinioClient( - host_url=kwargs.get('s3_url', 'localhost:9000'), - access_key=kwargs['s3_access_key'], - secret_key=kwargs['s3_secret_key'], - default_bucket=kwargs['s3_bucket'], + host_url=kwargs.get("s3_url", "localhost:9000"), + access_key=kwargs["s3_access_key"], + secret_key=kwargs["s3_secret_key"], + default_bucket=kwargs["s3_bucket"], ) - self.s3_extension = kwargs.get('s3_extension', ".unknown") - self.s3_folder = kwargs.get('s3_folder', "unknown") + self.s3_extension = kwargs.get("s3_extension", ".unknown") + self.s3_folder = kwargs.get("s3_folder", "unknown") self.doc_key = "unknown" def process(self, record: Any, key: Optional[str] = None) -> Any: - if record.get('status') != 'success' or not record.get(self.doc_key): + if record.get("status") != "success" or not record.get(self.doc_key): return assert key is not None if isinstance(key, bytes): - key_str = key.decode('utf-8') + key_str = key.decode("utf-8") elif isinstance(key, str): key_str = key assert len(key_str) == 40 - if 'sha1hex' in record: - assert key_str == record['sha1hex'] + if "sha1hex" in record: + assert key_str == record["sha1hex"] self.s3.put_blob( folder=self.s3_folder, - blob=record[self.doc_key].encode('utf-8'), + blob=record[self.doc_key].encode("utf-8"), sha1hex=key_str, extension=self.s3_extension, ) - self.counts['s3-put'] += 1 + self.counts["s3-put"] += 1 class PersistXmlDocWorker(GenericPersistDocWorker): @@ -628,10 +652,11 @@ class PersistXmlDocWorker(GenericPersistDocWorker): Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to sandcrawler database (SQL). """ + def __init__(self, **kwargs): super().__init__(**kwargs) - self.s3_extension = kwargs.get('s3_extension', ".jats.xml") - self.s3_folder = kwargs.get('s3_folder', "xml_doc") + self.s3_extension = kwargs.get("s3_extension", ".jats.xml") + self.s3_folder = kwargs.get("s3_folder", "xml_doc") self.doc_key = "jats_xml" @@ -640,8 +665,9 @@ class PersistHtmlTeiXmlWorker(GenericPersistDocWorker): Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to sandcrawler database (SQL). """ + def __init__(self, **kwargs): super().__init__(**kwargs) - self.s3_extension = kwargs.get('s3_extension', ".tei.xml") - self.s3_folder = kwargs.get('s3_folder', "html_body") + self.s3_extension = kwargs.get("s3_extension", ".tei.xml") + self.s3_folder = kwargs.get("s3_folder", "html_body") self.doc_key = "tei_xml" diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index ceb6671..bd7f36a 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -10,8 +10,13 @@ from typing import Any, Dict, List, Optional, Sequence import requests from confluent_kafka import Consumer, KafkaException, Producer -from .ia import (PetaboxError, SandcrawlerBackoffError, WaybackClient, WaybackContentError, - WaybackError) +from .ia import ( + PetaboxError, + SandcrawlerBackoffError, + WaybackClient, + WaybackContentError, + WaybackError, +) from .misc import parse_cdx_line @@ -22,25 +27,26 @@ class SandcrawlerWorker(object): Usually these get "pushed" into by a RecordPusher. Output goes to another worker (pipeline-style), or defaults to stdout. """ - def __init__(self, sink: Optional['SandcrawlerWorker'] = None): + + def __init__(self, sink: Optional["SandcrawlerWorker"] = None): self.counts: Counter = Counter() self.sink: Optional[SandcrawlerWorker] = sink def push_record(self, task: Any, key: Optional[str] = None) -> Any: - self.counts['total'] += 1 + self.counts["total"] += 1 if not self.want(task): - self.counts['skip'] += 1 + self.counts["skip"] += 1 return result = self.process(task, key=key) if not result: - self.counts['failed'] += 1 + self.counts["failed"] += 1 return - elif type(result) == dict and 'status' in result and len(result['status']) < 32: - self.counts[result['status']] += 1 + elif type(result) == dict and "status" in result and len(result["status"]) < 32: + self.counts[result["status"]] += 1 if self.sink: self.sink.push_record(result) - self.counts['pushed'] += 1 + self.counts["pushed"] += 1 else: print(json.dumps(result)) return result @@ -53,10 +59,9 @@ class SandcrawlerWorker(object): """ return None - def push_record_timeout(self, - task: Any, - key: Optional[str] = None, - timeout: int = 300) -> Any: + def push_record_timeout( + self, task: Any, key: Optional[str] = None, timeout: int = 300 + ) -> Any: """ A wrapper around self.push_record which sets a timeout. @@ -64,6 +69,7 @@ class SandcrawlerWorker(object): multithreading or if signal-based timeouts are used elsewhere in the same process. """ + def timeout_handler(signum: int, frame: Any) -> None: raise TimeoutError("timeout processing record") @@ -73,12 +79,12 @@ class SandcrawlerWorker(object): try: resp = self.push_record(task, key=key) except TimeoutError: - self.counts['timeout'] += 1 + self.counts["timeout"] += 1 resp = self.timeout_response(task) # pylint: disable=assignment-from-none # TODO: what if it is this push_record() itself that is timing out? if resp and self.sink: self.sink.push_record(resp) - self.counts['pushed'] += 1 + self.counts["pushed"] += 1 elif resp: print(json.dumps(resp)) finally: @@ -109,7 +115,7 @@ class SandcrawlerWorker(object): TODO: should derived workers explicitly type-check the 'task' object? """ - raise NotImplementedError('implementation required') + raise NotImplementedError("implementation required") class SandcrawlerFetchWorker(SandcrawlerWorker): @@ -117,25 +123,26 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg, PDFs) from wayback, archive.org, or other sources. """ + def __init__(self, wayback_client: Optional[WaybackClient], **kwargs): super().__init__(**kwargs) self.wayback_client = wayback_client def fetch_blob(self, record: Dict[str, Any]) -> Dict[str, Any]: - default_key = record['sha1hex'] + default_key = record["sha1hex"] wayback_sec = None petabox_sec = None - if record.get('warc_path') and record.get('warc_offset'): + if record.get("warc_path") and record.get("warc_offset"): # it's a full CDX dict. fetch using WaybackClient if not self.wayback_client: raise Exception("wayback client not configured for this SandcrawlerFetchWorker") try: start = time.time() blob: bytes = self.wayback_client.fetch_petabox_body( - csize=record['warc_csize'], - offset=record['warc_offset'], - warc_path=record['warc_path'], + csize=record["warc_csize"], + offset=record["warc_offset"], + warc_path=record["warc_path"], ) wayback_sec = time.time() - start except (WaybackError, WaybackContentError, PetaboxError, KeyError) as we: @@ -145,15 +152,15 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): status="error-wayback", error_msg=str(we), ) - elif record.get('url') and record.get('datetime'): + elif record.get("url") and record.get("datetime"): # it's a partial CDX dict or something? fetch using WaybackClient if not self.wayback_client: raise Exception("wayback client not configured for this SandcrawlerFetchWorker") try: start = time.time() blob = self.wayback_client.fetch_replay_body( - url=record['url'], - datetime=record['datetime'], + url=record["url"], + datetime=record["datetime"], ) wayback_sec = time.time() - start except (WaybackError, WaybackContentError) as we: @@ -163,11 +170,12 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): status="error-wayback", error_msg=str(we), ) - elif record.get('item') and record.get('path'): + elif record.get("item") and record.get("path"): # it's petabox link; fetch via HTTP start = time.time() - ia_resp = requests.get("https://archive.org/serve/{}/{}".format( - record['item'], record['path'])) + ia_resp = requests.get( + "https://archive.org/serve/{}/{}".format(record["item"], record["path"]) + ) petabox_sec = time.time() - start try: ia_resp.raise_for_status() @@ -181,7 +189,8 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): blob = ia_resp.content else: raise ValueError( - "not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed") + "not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed" + ) if not blob: return dict( key=default_key, @@ -201,29 +210,31 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): class MultiprocessWrapper(SandcrawlerWorker): - def __init__(self, - worker: SandcrawlerWorker, - sink: Optional[SandcrawlerWorker] = None, - jobs: Optional[int] = None): + def __init__( + self, + worker: SandcrawlerWorker, + sink: Optional[SandcrawlerWorker] = None, + jobs: Optional[int] = None, + ): self.counts = Counter() self.worker = worker self.sink = sink self.pool = multiprocessing.pool.Pool(jobs) def push_batch(self, tasks: List[Any]) -> List[Any]: - self.counts['total'] += len(tasks) + self.counts["total"] += len(tasks) print("... processing batch of: {}".format(len(tasks)), file=sys.stderr) results = self.pool.map(self.worker.process, tasks) for result in results: if not result: - self.counts['failed'] += 1 + self.counts["failed"] += 1 return [] - elif type(result) == dict and 'status' in result and len(result['status']) < 32: - self.counts[result['status']] += 1 + elif type(result) == dict and "status" in result and len(result["status"]) < 32: + self.counts[result["status"]] += 1 if self.sink: self.sink.push_record(result) - self.counts['pushed'] += 1 + self.counts["pushed"] += 1 else: print(json.dumps(result)) return results @@ -243,6 +254,7 @@ class BlackholeSink(SandcrawlerWorker): Useful for tests. """ + def push_record(self, task: Any, key: Optional[str] = None) -> Any: return @@ -257,12 +269,14 @@ class KafkaSink(SandcrawlerWorker): self.produce_topic = produce_topic self.kafka_hosts = kafka_hosts - config = self.producer_config({ - 'bootstrap.servers': kafka_hosts, - 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes - 'api.version.request': True, - 'api.version.fallback.ms': 0, - }) + config = self.producer_config( + { + "bootstrap.servers": kafka_hosts, + "message.max.bytes": 30000000, # ~30 MBytes; broker is ~50 MBytes + "api.version.request": True, + "api.version.fallback.ms": 0, + } + ) self.producer = Producer(config) @staticmethod @@ -275,27 +289,29 @@ class KafkaSink(SandcrawlerWorker): def producer_config(self, kafka_config: dict) -> dict: config = kafka_config.copy() - config.update({ - 'delivery.report.only.error': True, - 'default.topic.config': { - 'message.timeout.ms': 30000, - 'request.required.acks': -1, # all brokers must confirm + config.update( + { + "delivery.report.only.error": True, + "default.topic.config": { + "message.timeout.ms": 30000, + "request.required.acks": -1, # all brokers must confirm + }, } - }) + ) return config def push_record(self, msg: Any, key: Optional[str] = None) -> Any: - self.counts['total'] += 1 + self.counts["total"] += 1 if type(msg) == dict: - if not key and 'key' in msg: - key = msg['key'] + if not key and "key" in msg: + key = msg["key"] msg = json.dumps(msg) if type(msg) == str: - msg = msg.encode('utf-8') + msg = msg.encode("utf-8") assert type(msg) == bytes self.producer.produce(self.produce_topic, msg, key=key, on_delivery=self._fail_fast) - self.counts['produced'] += 1 + self.counts["produced"] += 1 # check for errors etc self.producer.poll(0) @@ -314,19 +330,22 @@ class KafkaCompressSink(KafkaSink): """ Variant of KafkaSink for large documents. Used for, eg, GROBID output. """ + def producer_config(self, kafka_config: Dict[str, Any]) -> Dict[str, Any]: config = kafka_config.copy() - config.update({ - 'compression.codec': 'gzip', - 'retry.backoff.ms': 250, - 'linger.ms': 1000, - 'batch.num.messages': 50, - 'delivery.report.only.error': True, - 'default.topic.config': { - 'message.timeout.ms': 30000, - 'request.required.acks': -1, # all brokers must confirm + config.update( + { + "compression.codec": "gzip", + "retry.backoff.ms": 250, + "linger.ms": 1000, + "batch.num.messages": 50, + "delivery.report.only.error": True, + "default.topic.config": { + "message.timeout.ms": 30000, + "request.required.acks": -1, # all brokers must confirm + }, } - }) + ) return config @@ -335,6 +354,7 @@ class RecordPusher: Base class for different record sources to be pushed into workers. Pretty trivial interface, just wraps an importer and pushes records in to it. """ + def __init__(self, worker: SandcrawlerWorker, **kwargs): self.counts: Counter = Counter() self.worker: SandcrawlerWorker = worker @@ -356,7 +376,7 @@ class JsonLinePusher(RecordPusher): self.counts = Counter() self.worker = worker self.json_file = json_file - self.batch_size = kwargs.get('batch_size', None) + self.batch_size = kwargs.get("batch_size", None) if self.batch_size in (0, 1): self.batch_size = None @@ -365,24 +385,24 @@ class JsonLinePusher(RecordPusher): for line in self.json_file: if not line: continue - self.counts['total'] += 1 + self.counts["total"] += 1 try: record = json.loads(line) except json.decoder.JSONDecodeError: - self.counts['error-json-decode'] += 1 + self.counts["error-json-decode"] += 1 continue if self.batch_size: batch.append(record) if len(batch) >= self.batch_size: self.worker.push_batch(batch) - self.counts['pushed'] += len(batch) + self.counts["pushed"] += len(batch) batch = [] else: self.worker.push_record(record) - self.counts['pushed'] += 1 + self.counts["pushed"] += 1 if self.batch_size and batch: self.worker.push_batch(batch) - self.counts['pushed'] += len(batch) + self.counts["pushed"] += len(batch) batch = [] self.worker.finish() print("JSON lines pushed: {}".format(self.counts), file=sys.stderr) @@ -394,10 +414,10 @@ class CdxLinePusher(RecordPusher): self.counts = Counter() self.worker = worker self.cdx_file = cdx_file - self.filter_http_statuses = kwargs.get('filter_http_statuses', None) - self.filter_mimetypes = kwargs.get('filter_mimetypes', None) - self.allow_octet_stream = kwargs.get('allow_octet_stream', False) - self.batch_size = kwargs.get('batch_size', None) + self.filter_http_statuses = kwargs.get("filter_http_statuses", None) + self.filter_mimetypes = kwargs.get("filter_mimetypes", None) + self.allow_octet_stream = kwargs.get("allow_octet_stream", False) + self.batch_size = kwargs.get("batch_size", None) if self.batch_size in (0, 1): self.batch_size = None @@ -406,30 +426,32 @@ class CdxLinePusher(RecordPusher): for line in self.cdx_file: if not line: continue - self.counts['total'] += 1 + self.counts["total"] += 1 record = parse_cdx_line(line, normalize=True) if not record: - self.counts['skip-parse'] += 1 + self.counts["skip-parse"] += 1 continue - if self.filter_http_statuses and record[ - 'http_status'] not in self.filter_http_statuses: - self.counts['skip-http_status'] += 1 + if ( + self.filter_http_statuses + and record["http_status"] not in self.filter_http_statuses + ): + self.counts["skip-http_status"] += 1 continue - if self.filter_mimetypes and record['mimetype'] not in self.filter_mimetypes: - self.counts['skip-mimetype'] += 1 + if self.filter_mimetypes and record["mimetype"] not in self.filter_mimetypes: + self.counts["skip-mimetype"] += 1 continue if self.batch_size: batch.append(record) if len(batch) >= self.batch_size: self.worker.push_batch(batch) - self.counts['pushed'] += len(batch) + self.counts["pushed"] += len(batch) batch = [] else: self.worker.push_record(record) - self.counts['pushed'] += 1 + self.counts["pushed"] += 1 if self.batch_size and batch: self.worker.push_batch(batch) - self.counts['pushed'] += len(batch) + self.counts["pushed"] += len(batch) batch = [] self.worker.finish() print("CDX lines pushed: {}".format(self.counts), file=sys.stderr) @@ -442,33 +464,33 @@ class ZipfilePusher(RecordPusher): self.worker = worker self.filter_suffix = ".pdf" self.zipfile_path = zipfile_path - self.batch_size = kwargs.get('batch_size', None) + self.batch_size = kwargs.get("batch_size", None) if self.batch_size in (0, 1): self.batch_size = None def run(self) -> Counter: batch = [] - with zipfile.ZipFile(self.zipfile_path, 'r') as archive: + with zipfile.ZipFile(self.zipfile_path, "r") as archive: for zipinfo in archive.infolist(): if not zipinfo.filename.endswith(self.filter_suffix): continue - self.counts['total'] += 1 + self.counts["total"] += 1 # NB doesn't really extract the file, just gives you a stream (file-like-object) for reading it - flo = archive.open(zipinfo, 'r') - data = flo.read(2**32) + flo = archive.open(zipinfo, "r") + data = flo.read(2 ** 32) flo.close() if self.batch_size: batch.append(data) if len(batch) >= self.batch_size: self.worker.push_batch(batch) - self.counts['pushed'] += len(batch) + self.counts["pushed"] += len(batch) batch = [] else: self.worker.push_record(data) - self.counts['pushed'] += 1 + self.counts["pushed"] += 1 if self.batch_size and batch: self.worker.push_batch(batch) - self.counts['pushed'] += len(batch) + self.counts["pushed"] += len(batch) batch = [] self.worker.finish() print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr) @@ -476,8 +498,14 @@ class ZipfilePusher(RecordPusher): class KafkaJsonPusher(RecordPusher): - def __init__(self, worker: SandcrawlerWorker, kafka_hosts: str, consume_topic: str, - group: str, **kwargs): + def __init__( + self, + worker: SandcrawlerWorker, + kafka_hosts: str, + consume_topic: str, + group: str, + **kwargs + ): self.counts = Counter() self.worker = worker self.consumer = make_kafka_consumer( @@ -485,14 +513,14 @@ class KafkaJsonPusher(RecordPusher): consume_topic, group, ) - self.push_batches = kwargs.get('push_batches', False) - self.raw_records = kwargs.get('raw_records', False) - self.poll_interval = kwargs.get('poll_interval', 5.0) - self.batch_size = kwargs.get('batch_size', 100) + self.push_batches = kwargs.get("push_batches", False) + self.raw_records = kwargs.get("raw_records", False) + self.poll_interval = kwargs.get("poll_interval", 5.0) + self.batch_size = kwargs.get("batch_size", 100) if self.batch_size in (0, 1): self.batch_size = 1 - self.batch_worker = kwargs.get('batch_worker', False) - self.process_timeout_sec = kwargs.get('process_timeout_sec', 300) + self.batch_worker = kwargs.get("batch_worker", False) + self.process_timeout_sec = kwargs.get("process_timeout_sec", 300) def run(self) -> Counter: while True: @@ -502,11 +530,15 @@ class KafkaJsonPusher(RecordPusher): # case where there there is one update and thousands of creates; # update would be lingering in worker, and if worker crashed # never created. Not great. - batch = self.consumer.consume(num_messages=self.batch_size, - timeout=self.poll_interval) - print("... got {} kafka messages ({}sec poll interval)".format( - len(batch), self.poll_interval), - file=sys.stderr) + batch = self.consumer.consume( + num_messages=self.batch_size, timeout=self.poll_interval + ) + print( + "... got {} kafka messages ({}sec poll interval)".format( + len(batch), self.poll_interval + ), + file=sys.stderr, + ) if not batch: # TODO: could have some larger timeout here and # self.worker.finish() if it's been more than, eg, a couple @@ -518,14 +550,14 @@ class KafkaJsonPusher(RecordPusher): raise KafkaException(msg.error()) # ... then process if self.push_batches: - self.counts['total'] += len(batch) - records = [json.loads(msg.value().decode('utf-8')) for msg in batch] + self.counts["total"] += len(batch) + records = [json.loads(msg.value().decode("utf-8")) for msg in batch] self.worker.push_batch(records) - self.counts['pushed'] += len(batch) + self.counts["pushed"] += len(batch) print("Import counts: {}".format(self.worker.counts), file=sys.stderr) else: for msg in batch: - self.counts['total'] += 1 + self.counts["total"] += 1 if self.raw_records: # In this mode, pass the Kafka message as bytes through # without decoding as JSON. Eg, for thumbnails (where @@ -533,7 +565,7 @@ class KafkaJsonPusher(RecordPusher): # from the message) record = msg.value() else: - record = json.loads(msg.value().decode('utf-8')) + record = json.loads(msg.value().decode("utf-8")) # This complex bit of code implements backoff/backpressure # in a way that will not cause this Kafka consumer to lose # partition assignments (resulting in a rebalance). This @@ -543,9 +575,9 @@ class KafkaJsonPusher(RecordPusher): while not done: try: # use timeouts; don't want kafka itself to timeout - self.worker.push_record_timeout(record, - key=msg.key(), - timeout=self.process_timeout_sec) + self.worker.push_record_timeout( + record, key=msg.key(), timeout=self.process_timeout_sec + ) break except SandcrawlerBackoffError as be: print("Backing off for 200 seconds: {}".format(be)) @@ -557,8 +589,8 @@ class KafkaJsonPusher(RecordPusher): assert not empty_batch time.sleep(5) self.consumer.resume(self.consumer.assignment()) - self.counts['pushed'] += 1 - if self.counts['total'] % 500 == 0: + self.counts["pushed"] += 1 + if self.counts["total"] % 500 == 0: print("Import counts: {}".format(self.worker.counts), file=sys.stderr) for msg in batch: # locally store offsets of processed messages; will be @@ -589,25 +621,25 @@ def make_kafka_consumer(hosts: str, consume_topic: str, group: str) -> Consumer: print("Bailing out...", file=sys.stderr) # TODO: should it be sys.exit(-1)? raise KafkaException(p.error) - #print("Kafka consumer commit successful") + # print("Kafka consumer commit successful") pass # previously, using pykafka - #auto_commit_enable=True, - #auto_commit_interval_ms=30000, # 30 seconds + # auto_commit_enable=True, + # auto_commit_interval_ms=30000, # 30 seconds conf = { - 'bootstrap.servers': hosts, - 'group.id': group, - 'on_commit': fail_fast, + "bootstrap.servers": hosts, + "group.id": group, + "on_commit": fail_fast, # messages don't have offset marked as stored until processed, # but we do auto-commit stored offsets to broker - 'enable.auto.offset.store': False, - 'enable.auto.commit': True, + "enable.auto.offset.store": False, + "enable.auto.commit": True, # user code timeout; if no poll after this long, assume user code # hung and rebalance (default: 6min) - 'max.poll.interval.ms': 360000, - 'default.topic.config': { - 'auto.offset.reset': 'latest', + "max.poll.interval.ms": 360000, + "default.topic.config": { + "auto.offset.reset": "latest", }, } @@ -615,8 +647,9 @@ def make_kafka_consumer(hosts: str, consume_topic: str, group: str) -> Consumer: for p in partitions: if p.error: raise KafkaException(p.error) - print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions), - file=sys.stderr) + print( + "Kafka partitions rebalanced: {} / {}".format(consumer, partitions), file=sys.stderr + ) consumer = Consumer(conf) # NOTE: it's actually important that topic_name *not* be bytes (UTF-8 -- cgit v1.2.3