aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2021-10-27 18:50:17 -0700
committerBryan Newbold <bnewbold@archive.org>2021-10-27 18:50:17 -0700
commit826c7538e091fac14d987a3cd654975da964e240 (patch)
tree90345b4cabb461c624ca5a218c2fc01dce3055cd /python/sandcrawler
parent020037d4714e7ba2ab172c7278494aed0b2148ad (diff)
downloadsandcrawler-826c7538e091fac14d987a3cd654975da964e240.tar.gz
sandcrawler-826c7538e091fac14d987a3cd654975da964e240.zip
make fmt (black 21.9b0)
Diffstat (limited to 'python/sandcrawler')
-rw-r--r--python/sandcrawler/__init__.py52
-rw-r--r--python/sandcrawler/db.py263
-rw-r--r--python/sandcrawler/fileset_platforms.py463
-rw-r--r--python/sandcrawler/fileset_strategies.py171
-rw-r--r--python/sandcrawler/fileset_types.py2
-rw-r--r--python/sandcrawler/grobid.py105
-rw-r--r--python/sandcrawler/html.py253
-rw-r--r--python/sandcrawler/html_metadata.py133
-rw-r--r--python/sandcrawler/ia.py611
-rw-r--r--python/sandcrawler/ingest_file.py477
-rw-r--r--python/sandcrawler/ingest_fileset.py372
-rw-r--r--python/sandcrawler/ingest_html.py139
-rw-r--r--python/sandcrawler/minio.py52
-rw-r--r--python/sandcrawler/misc.py115
-rw-r--r--python/sandcrawler/pdfextract.py147
-rw-r--r--python/sandcrawler/pdftrio.py78
-rw-r--r--python/sandcrawler/persist.py450
-rw-r--r--python/sandcrawler/workers.py289
18 files changed, 2332 insertions, 1840 deletions
diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py
index 46735eb..6718c57 100644
--- a/python/sandcrawler/__init__.py
+++ b/python/sandcrawler/__init__.py
@@ -1,16 +1,48 @@
from .db import SandcrawlerPostgresClient, SandcrawlerPostgrestClient
from .grobid import GrobidBlobWorker, GrobidClient, GrobidWorker
-from .ia import (CdxApiClient, CdxApiError, CdxPartial, CdxRow, PetaboxError, ResourceResult,
- SavePageNowClient, SavePageNowError, WarcResource, WaybackClient,
- WaybackContentError, WaybackError)
+from .ia import (
+ CdxApiClient,
+ CdxApiError,
+ CdxPartial,
+ CdxRow,
+ PetaboxError,
+ ResourceResult,
+ SavePageNowClient,
+ SavePageNowError,
+ WarcResource,
+ WaybackClient,
+ WaybackContentError,
+ WaybackError,
+)
from .ingest_file import IngestFileWorker
from .ingest_fileset import IngestFilesetWorker
-from .misc import (b32_hex, clean_url, gen_file_metadata, gen_file_metadata_path,
- parse_cdx_datetime, parse_cdx_line)
+from .misc import (
+ b32_hex,
+ clean_url,
+ gen_file_metadata,
+ gen_file_metadata_path,
+ parse_cdx_datetime,
+ parse_cdx_line,
+)
from .pdfextract import PdfExtractBlobWorker, PdfExtractWorker
from .pdftrio import PdfTrioBlobWorker, PdfTrioClient, PdfTrioWorker
-from .persist import (PersistCdxWorker, PersistGrobidDiskWorker, PersistGrobidWorker,
- PersistIngestFileResultWorker, PersistIngestRequestWorker,
- PersistPdfTextWorker, PersistPdfTrioWorker, PersistThumbnailWorker)
-from .workers import (BlackholeSink, CdxLinePusher, JsonLinePusher, KafkaCompressSink,
- KafkaJsonPusher, KafkaSink, MultiprocessWrapper, ZipfilePusher)
+from .persist import (
+ PersistCdxWorker,
+ PersistGrobidDiskWorker,
+ PersistGrobidWorker,
+ PersistIngestFileResultWorker,
+ PersistIngestRequestWorker,
+ PersistPdfTextWorker,
+ PersistPdfTrioWorker,
+ PersistThumbnailWorker,
+)
+from .workers import (
+ BlackholeSink,
+ CdxLinePusher,
+ JsonLinePusher,
+ KafkaCompressSink,
+ KafkaJsonPusher,
+ KafkaSink,
+ MultiprocessWrapper,
+ ZipfilePusher,
+)
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
index ee4d3bf..69d2116 100644
--- a/python/sandcrawler/db.py
+++ b/python/sandcrawler/db.py
@@ -12,12 +12,12 @@ class SandcrawlerPostgrestClient:
self.api_url = api_url
def get_cdx(self, url: str) -> Optional[dict]:
- resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.' + url))
+ resp = requests.get(self.api_url + "/cdx", params=dict(url="eq." + url))
resp.raise_for_status()
return resp.json() or None
def get_grobid(self, sha1: str) -> Optional[dict]:
- resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.' + sha1))
+ resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex="eq." + sha1))
resp.raise_for_status()
resp_json = resp.json()
if resp_json:
@@ -26,7 +26,7 @@ class SandcrawlerPostgrestClient:
return None
def get_pdftrio(self, sha1: str) -> Optional[dict]:
- resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.' + sha1))
+ resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex="eq." + sha1))
resp.raise_for_status()
resp_json = resp.json()
if resp_json:
@@ -35,7 +35,7 @@ class SandcrawlerPostgrestClient:
return None
def get_pdf_meta(self, sha1: str) -> Optional[dict]:
- resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.' + sha1))
+ resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex="eq." + sha1))
resp.raise_for_status()
resp_json = resp.json()
if resp_json:
@@ -56,7 +56,7 @@ class SandcrawlerPostgrestClient:
return None
def get_file_meta(self, sha1: str) -> Optional[dict]:
- resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.' + sha1))
+ resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex="eq." + sha1))
resp.raise_for_status()
resp_json = resp.json()
if resp_json:
@@ -89,7 +89,7 @@ class SandcrawlerPostgrestClient:
return None
def get_crossref(self, doi: str) -> Optional[dict]:
- resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.' + doi))
+ resp = requests.get(self.api_url + "/crossref", params=dict(doi="eq." + doi))
resp.raise_for_status()
resp_json = resp.json()
if resp_json:
@@ -117,10 +117,12 @@ class SandcrawlerPostgresClient:
updates = 0
return (inserts, updates)
- def insert_cdx(self,
- cur: psycopg2.extensions.cursor,
- batch: List[Dict[str, Any]],
- on_conflict: str = "nothing") -> Tuple[int, int]:
+ def insert_cdx(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset)
@@ -133,11 +135,21 @@ class SandcrawlerPostgresClient:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [d for d in batch if d.get('warc_path')]
+ batch = [d for d in batch if d.get("warc_path")]
if not batch:
return (0, 0)
- rows = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'], d['warc_path'],
- int(d['warc_csize']), int(d['warc_offset'])) for d in batch]
+ rows = [
+ (
+ d["url"],
+ d["datetime"],
+ d["sha1hex"],
+ d["mimetype"],
+ d["warc_path"],
+ int(d["warc_csize"]),
+ int(d["warc_offset"]),
+ )
+ for d in batch
+ ]
# filter out duplicate rows by key (url, datetime)
row_dict = dict()
for b in rows:
@@ -146,10 +158,12 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_file_meta(self,
- cur: psycopg2.extensions.cursor,
- batch: List[Dict[str, Any]],
- on_conflict: str = "nothing") -> Tuple[int, int]:
+ def insert_file_meta(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
file_meta(sha1hex, sha256hex, md5hex, size_bytes, mimetype)
@@ -168,8 +182,10 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- rows = [(d['sha1hex'], d['sha256hex'], d['md5hex'], int(d['size_bytes']), d['mimetype'])
- for d in batch]
+ rows = [
+ (d["sha1hex"], d["sha256hex"], d["md5hex"], int(d["size_bytes"]), d["mimetype"])
+ for d in batch
+ ]
# filter out duplicate rows by key (sha1hex)
row_dict = dict()
for b in rows:
@@ -178,10 +194,12 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_grobid(self,
- cur: psycopg2.extensions.cursor,
- batch: List[Dict[str, Any]],
- on_conflict: str = "nothing") -> Tuple[int, int]:
+ def insert_grobid(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
grobid (sha1hex, grobid_version, status_code, status, fatcat_release, updated, metadata)
@@ -203,24 +221,27 @@ class SandcrawlerPostgresClient:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
for r in batch:
- if r.get('metadata'):
+ if r.get("metadata"):
# sometimes these are only in metadata; shouldn't pass through
# though (to save database space)
- dupe_fields = ('fatcat_release', 'grobid_version')
+ dupe_fields = ("fatcat_release", "grobid_version")
for k in dupe_fields:
if k not in r:
- r[k] = r['metadata'].get(k)
- r['metadata'].pop(k, None)
- r['metadata'] = json.dumps(r['metadata'], sort_keys=True)
- rows = [(
- d['key'],
- d.get('grobid_version') or None,
- d['status_code'],
- d['status'],
- d.get('fatcat_release') or None,
- d.get('updated') or datetime.datetime.now(),
- d.get('metadata') or None,
- ) for d in batch]
+ r[k] = r["metadata"].get(k)
+ r["metadata"].pop(k, None)
+ r["metadata"] = json.dumps(r["metadata"], sort_keys=True)
+ rows = [
+ (
+ d["key"],
+ d.get("grobid_version") or None,
+ d["status_code"],
+ d["status"],
+ d.get("fatcat_release") or None,
+ d.get("updated") or datetime.datetime.now(),
+ d.get("metadata") or None,
+ )
+ for d in batch
+ ]
# filter out duplicate rows by key (sha1hex)
row_dict = dict()
for b in rows:
@@ -229,10 +250,9 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_pdf_meta(self,
- cur: psycopg2.extensions.cursor,
- rows: List[Tuple],
- on_conflict: str = "nothing") -> Tuple[int, int]:
+ def insert_pdf_meta(
+ self, cur: psycopg2.extensions.cursor, rows: List[Tuple], on_conflict: str = "nothing"
+ ) -> Tuple[int, int]:
"""
batch elements are expected to have .to_sql_tuple() method
"""
@@ -269,10 +289,9 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_html_meta(self,
- cur: psycopg2.extensions.cursor,
- rows: List[Tuple],
- on_conflict: str = "nothing") -> Tuple[int, int]:
+ def insert_html_meta(
+ self, cur: psycopg2.extensions.cursor, rows: List[Tuple], on_conflict: str = "nothing"
+ ) -> Tuple[int, int]:
"""
batch elements are expected to have .to_sql_tuple() method
"""
@@ -306,10 +325,12 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_pdftrio(self,
- cur: psycopg2.extensions.cursor,
- batch: List[Dict[str, Any]],
- on_conflict: str = "nothing") -> Tuple[int, int]:
+ def insert_pdftrio(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
pdftrio (sha1hex, updated, status_code, status, pdftrio_version,
@@ -335,18 +356,21 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- rows = [(
- d['key'],
- d.get('updated') or datetime.datetime.now(),
- d['status_code'],
- d['status'],
- d.get('versions', {}).get('pdftrio_version') or None,
- d.get('versions', {}).get('models_date') or None,
- d.get('ensemble_score'),
- d.get('bert_score'),
- d.get('linear_score'),
- d.get('image_score'),
- ) for d in batch]
+ rows = [
+ (
+ d["key"],
+ d.get("updated") or datetime.datetime.now(),
+ d["status_code"],
+ d["status"],
+ d.get("versions", {}).get("pdftrio_version") or None,
+ d.get("versions", {}).get("models_date") or None,
+ d.get("ensemble_score"),
+ d.get("bert_score"),
+ d.get("linear_score"),
+ d.get("image_score"),
+ )
+ for d in batch
+ ]
# filter out duplicate rows by key (sha1hex)
row_dict = dict()
for b in rows:
@@ -355,10 +379,12 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_ingest_request(self,
- cur: psycopg2.extensions.cursor,
- batch: List[Dict[str, Any]],
- on_conflict: str = "nothing") -> Tuple[int, int]:
+ def insert_ingest_request(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
ingest_request (link_source, link_source_id, ingest_type, base_url, ingest_request_source, release_stage, request)
@@ -372,21 +398,24 @@ class SandcrawlerPostgresClient:
sql += " RETURNING xmax;"
for r in batch:
# in case these fields were already packed into 'request'
- extra = r.get('request', {})
- for k in ('ext_ids', 'fatcat_release', 'edit_extra', 'rel'):
+ extra = r.get("request", {})
+ for k in ("ext_ids", "fatcat_release", "edit_extra", "rel"):
if r.get(k):
extra[k] = r[k]
if extra:
- r['extra'] = json.dumps(extra, sort_keys=True)
- rows = [(
- d['link_source'],
- d['link_source_id'],
- d['ingest_type'],
- d['base_url'],
- d.get('ingest_request_source'),
- d.get('release_stage') or None,
- d.get('extra') or None,
- ) for d in batch]
+ r["extra"] = json.dumps(extra, sort_keys=True)
+ rows = [
+ (
+ d["link_source"],
+ d["link_source_id"],
+ d["ingest_type"],
+ d["base_url"],
+ d.get("ingest_request_source"),
+ d.get("release_stage") or None,
+ d.get("extra") or None,
+ )
+ for d in batch
+ ]
# filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url)
row_dict = dict()
for b in rows:
@@ -395,10 +424,12 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_ingest_file_result(self,
- cur: psycopg2.extensions.cursor,
- batch: List[Dict[str, Any]],
- on_conflict: str = "nothing") -> Tuple[int, int]:
+ def insert_ingest_file_result(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex)
@@ -420,16 +451,19 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- rows = [(
- d['ingest_type'],
- d['base_url'],
- bool(d['hit']),
- d['status'],
- d.get('terminal_url'),
- d.get('terminal_dt'),
- d.get('terminal_status_code'),
- d.get('terminal_sha1hex'),
- ) for d in batch]
+ rows = [
+ (
+ d["ingest_type"],
+ d["base_url"],
+ bool(d["hit"]),
+ d["status"],
+ d.get("terminal_url"),
+ d.get("terminal_dt"),
+ d.get("terminal_status_code"),
+ d.get("terminal_sha1hex"),
+ )
+ for d in batch
+ ]
# filter out duplicate rows by key (ingest_type, base_url)
row_dict = dict()
for b in rows:
@@ -438,10 +472,12 @@ class SandcrawlerPostgresClient:
resp = psycopg2.extras.execute_values(cur, sql, rows, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
- def insert_ingest_fileset_platform(self,
- cur: psycopg2.extensions.cursor,
- batch: List[Dict[str, Any]],
- on_conflict: str = "nothing") -> Tuple[int, int]:
+ def insert_ingest_fileset_platform(
+ self,
+ cur: psycopg2.extensions.cursor,
+ batch: List[Dict[str, Any]],
+ on_conflict: str = "nothing",
+ ) -> Tuple[int, int]:
sql = """
INSERT INTO
ingest_fileset_platform (ingest_type, base_url, hit, status, platform_name, platform_domain, platform_id, ingest_strategy, total_size, file_count, archiveorg_item_name, archiveorg_item_bundle_path, web_bundle_url, web_bundle_dt, manifest)
@@ -470,23 +506,26 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- rows = [(
- d['ingest_type'],
- d['base_url'],
- bool(d['hit']),
- d['status'],
- d.get('platform_name'),
- d.get('platform_domain'),
- d.get('platform_id'),
- d.get('ingest_strategy'),
- d.get('total_size'),
- d.get('file_count'),
- d.get('archiveorg_item_name'),
- d.get('archiveorg_item_bundle_path'),
- d.get('web_bundle_url'),
- d.get('web_bundle_dt'),
- d.get('manifest'),
- ) for d in batch]
+ rows = [
+ (
+ d["ingest_type"],
+ d["base_url"],
+ bool(d["hit"]),
+ d["status"],
+ d.get("platform_name"),
+ d.get("platform_domain"),
+ d.get("platform_id"),
+ d.get("ingest_strategy"),
+ d.get("total_size"),
+ d.get("file_count"),
+ d.get("archiveorg_item_name"),
+ d.get("archiveorg_item_bundle_path"),
+ d.get("web_bundle_url"),
+ d.get("web_bundle_dt"),
+ d.get("manifest"),
+ )
+ for d in batch
+ ]
# filter out duplicate rows by key (ingest_type, base_url)
row_dict = dict()
for b in rows:
diff --git a/python/sandcrawler/fileset_platforms.py b/python/sandcrawler/fileset_platforms.py
index 86e3ff2..fbe8066 100644
--- a/python/sandcrawler/fileset_platforms.py
+++ b/python/sandcrawler/fileset_platforms.py
@@ -4,25 +4,38 @@ from typing import Optional, Tuple
import internetarchive
import requests
-from sandcrawler.fileset_types import (FilesetManifestFile, FilesetPlatformItem, IngestStrategy,
- PlatformRestrictedError, PlatformScopeError)
+from sandcrawler.fileset_types import (
+ FilesetManifestFile,
+ FilesetPlatformItem,
+ IngestStrategy,
+ PlatformRestrictedError,
+ PlatformScopeError,
+)
from sandcrawler.html_metadata import BiblioMetadata
from sandcrawler.ia import ResourceResult
-class FilesetPlatformHelper():
+class FilesetPlatformHelper:
def __init__(self):
- self.platform_name = 'unknown'
-
- def match_request(self, request: dict, resource: Optional[ResourceResult],
- html_biblio: Optional[BiblioMetadata]) -> bool:
+ self.platform_name = "unknown"
+
+ def match_request(
+ self,
+ request: dict,
+ resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata],
+ ) -> bool:
"""
Does this request look like it matches this platform?
"""
raise NotImplementedError()
- def process_request(self, request: dict, resource: Optional[ResourceResult],
- html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
+ def process_request(
+ self,
+ request: dict,
+ resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata],
+ ) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
"""
@@ -47,7 +60,7 @@ class FilesetPlatformHelper():
class DataverseHelper(FilesetPlatformHelper):
def __init__(self):
super().__init__()
- self.platform_name = 'dataverse'
+ self.platform_name = "dataverse"
self.session = requests.Session()
@staticmethod
@@ -69,16 +82,16 @@ class DataverseHelper(FilesetPlatformHelper):
If there is an error parsing, raises a ValueError
"""
id_type = None
- if pid.startswith('doi:10.'):
- id_type = 'doi'
+ if pid.startswith("doi:10."):
+ id_type = "doi"
pid = pid[4:]
- elif pid.startswith('hdl:'):
- id_type = 'hdl'
+ elif pid.startswith("hdl:"):
+ id_type = "hdl"
pid = pid[4:]
else:
raise ValueError(f"unknown dataverse persistentId format: {pid}")
- comp = pid.split('/')
+ comp = pid.split("/")
if len(comp) < 2:
raise ValueError(f"unknown dataverse persistentId format: {pid}")
@@ -114,19 +127,23 @@ class DataverseHelper(FilesetPlatformHelper):
"file_id": file_id,
}
- def match_request(self, request: dict, resource: Optional[ResourceResult],
- html_biblio: Optional[BiblioMetadata]) -> bool:
+ def match_request(
+ self,
+ request: dict,
+ resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata],
+ ) -> bool:
if resource and resource.terminal_url:
url = resource.terminal_url
else:
- url = request['base_url']
+ url = request["base_url"]
# TODO: could also do HTML platform detection or something?
components = urllib.parse.urlparse(url)
# platform_domain = components.netloc.split(':')[0].lower()
params = urllib.parse.parse_qs(components.query)
- id_param = params.get('persistentId')
+ id_param = params.get("persistentId")
if not id_param:
return False
platform_id = id_param[0]
@@ -138,8 +155,12 @@ class DataverseHelper(FilesetPlatformHelper):
return True
- def process_request(self, request: dict, resource: Optional[ResourceResult],
- html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
+ def process_request(
+ self,
+ request: dict,
+ resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata],
+ ) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
@@ -150,17 +171,17 @@ class DataverseHelper(FilesetPlatformHelper):
if resource and resource.terminal_url:
url = resource.terminal_url
else:
- url = request['base_url']
+ url = request["base_url"]
# 1. extract domain, PID, and version from URL
components = urllib.parse.urlparse(url)
- platform_domain = components.netloc.split(':')[0].lower()
+ platform_domain = components.netloc.split(":")[0].lower()
params = urllib.parse.parse_qs(components.query)
- id_param = params.get('persistentId')
+ id_param = params.get("persistentId")
if not (id_param and id_param[0]):
raise PlatformScopeError("Expected a Dataverse persistentId in URL")
platform_id = id_param[0]
- version_param = params.get('version')
+ version_param = params.get("version")
dataset_version = None
if version_param:
dataset_version = version_param[0]
@@ -170,10 +191,11 @@ class DataverseHelper(FilesetPlatformHelper):
except ValueError:
raise PlatformScopeError("not actually in scope")
- if parsed_id['file_id']:
+ if parsed_id["file_id"]:
# TODO: maybe we could support this?
raise PlatformScopeError(
- "only entire dataverse datasets can be archived with this tool")
+ "only entire dataverse datasets can be archived with this tool"
+ )
# 1b. if we didn't get a version number from URL, fetch it from API
if not dataset_version:
@@ -182,8 +204,10 @@ class DataverseHelper(FilesetPlatformHelper):
)
resp.raise_for_status()
obj = resp.json()
- obj_latest = obj['data']['latestVersion']
- dataset_version = f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}"
+ obj_latest = obj["data"]["latestVersion"]
+ dataset_version = (
+ f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}"
+ )
# 2. API fetch
resp = self.session.get(
@@ -192,69 +216,72 @@ class DataverseHelper(FilesetPlatformHelper):
resp.raise_for_status()
obj = resp.json()
- obj_latest = obj['data']['latestVersion']
- assert dataset_version == f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}"
- assert platform_id == obj_latest['datasetPersistentId']
+ obj_latest = obj["data"]["latestVersion"]
+ assert (
+ dataset_version
+ == f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}"
+ )
+ assert platform_id == obj_latest["datasetPersistentId"]
manifest = []
- for row in obj_latest['files']:
- df = row['dataFile']
- df_persistent_id = df['persistentId']
+ for row in obj_latest["files"]:
+ df = row["dataFile"]
+ df_persistent_id = df["persistentId"]
platform_url = f"https://{platform_domain}/api/access/datafile/:persistentId/?persistentId={df_persistent_id}"
- if df.get('originalFileName'):
- platform_url += '&format=original'
+ if df.get("originalFileName"):
+ platform_url += "&format=original"
extra = dict()
# TODO: always save the version field?
- if row.get('version') != 1:
- extra['version'] = row['version']
- if 'description' in df:
- extra['description'] = df['description']
+ if row.get("version") != 1:
+ extra["version"] = row["version"]
+ if "description" in df:
+ extra["description"] = df["description"]
manifest.append(
FilesetManifestFile(
- path=df.get('originalFileName') or df['filename'],
- size=df.get('originalFileSize') or df['filesize'],
- md5=df['md5'],
+ path=df.get("originalFileName") or df["filename"],
+ size=df.get("originalFileSize") or df["filesize"],
+ md5=df["md5"],
# NOTE: don't get: sha1, sha256
- mimetype=df['contentType'],
+ mimetype=df["contentType"],
platform_url=platform_url,
extra=extra or None,
- ))
+ )
+ )
- platform_sub_id = platform_id.split('/')[-1]
+ platform_sub_id = platform_id.split("/")[-1]
archiveorg_item_name = f"{platform_domain}-{platform_sub_id}-v{dataset_version}"
archiveorg_item_meta = dict(
# TODO: collection=platform_domain,
collection="datasets",
- date=obj_latest['releaseTime'].split('T')[0],
- source=
- f"https://{platform_domain}/dataset.xhtml?persistentId={platform_id}&version={dataset_version}",
+ date=obj_latest["releaseTime"].split("T")[0],
+ source=f"https://{platform_domain}/dataset.xhtml?persistentId={platform_id}&version={dataset_version}",
)
- if platform_id.startswith('doi:10.'):
- archiveorg_item_meta['doi'] = platform_id.replace('doi:', '')
- for block in obj_latest['metadataBlocks']['citation']['fields']:
- if block['typeName'] == 'title':
- archiveorg_item_meta['title'] = block['value']
- elif block['typeName'] == 'depositor':
- archiveorg_item_meta['creator'] = block['value']
- elif block['typeName'] == 'dsDescription':
- archiveorg_item_meta['description'] = block['value'][0]['dsDescriptionValue'][
- 'value']
-
- archiveorg_item_meta['description'] = archiveorg_item_meta.get('description', '')
- if obj_latest.get('termsOfUse'):
- archiveorg_item_meta['description'] += '\n<br>\n' + obj_latest['termsOfUse']
+ if platform_id.startswith("doi:10."):
+ archiveorg_item_meta["doi"] = platform_id.replace("doi:", "")
+ for block in obj_latest["metadataBlocks"]["citation"]["fields"]:
+ if block["typeName"] == "title":
+ archiveorg_item_meta["title"] = block["value"]
+ elif block["typeName"] == "depositor":
+ archiveorg_item_meta["creator"] = block["value"]
+ elif block["typeName"] == "dsDescription":
+ archiveorg_item_meta["description"] = block["value"][0]["dsDescriptionValue"][
+ "value"
+ ]
+
+ archiveorg_item_meta["description"] = archiveorg_item_meta.get("description", "")
+ if obj_latest.get("termsOfUse"):
+ archiveorg_item_meta["description"] += "\n<br>\n" + obj_latest["termsOfUse"]
return FilesetPlatformItem(
platform_name=self.platform_name,
- platform_status='success',
+ platform_status="success",
manifest=manifest,
platform_domain=platform_domain,
platform_id=platform_id,
archiveorg_item_name=archiveorg_item_name,
archiveorg_item_meta=archiveorg_item_meta,
- web_bundle_url=
- f"https://{platform_domain}/api/access/dataset/:persistentId/?persistentId={platform_id}&format=original",
+ web_bundle_url=f"https://{platform_domain}/api/access/dataset/:persistentId/?persistentId={platform_id}&format=original",
# TODO: web_base_url= (for GWB downloading, in lieu of platform_url on individual files)
extra=dict(version=dataset_version),
)
@@ -301,7 +328,7 @@ def test_parse_dataverse_persistentid() -> None:
}
invalid = [
- #"doi:10.5072/FK2/J8SJZB/LL6WXZ",
+ # "doi:10.5072/FK2/J8SJZB/LL6WXZ",
"doi:10.25625/abcd",
"other:10.25625/LL6WXZ",
"10.25625/LL6WXZ",
@@ -322,7 +349,7 @@ def test_parse_dataverse_persistentid() -> None:
class FigshareHelper(FilesetPlatformHelper):
def __init__(self):
super().__init__()
- self.platform_name = 'figshare'
+ self.platform_name = "figshare"
self.session = requests.Session()
@staticmethod
@@ -337,13 +364,13 @@ class FigshareHelper(FilesetPlatformHelper):
# eg: /articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858/1
# /articles/dataset/STable_1_U-Pb_geochronologic_analyses_on_samples_xls/12127176/4
- comp = path.split('/')
- if len(comp) < 4 or comp[1] != 'articles':
+ comp = path.split("/")
+ if len(comp) < 4 or comp[1] != "articles":
raise ValueError(f"not a figshare URL: {path}")
comp = comp[2:]
if comp[0] in [
- 'dataset',
+ "dataset",
]:
comp = comp[1:]
@@ -354,19 +381,23 @@ class FigshareHelper(FilesetPlatformHelper):
else:
raise ValueError(f"couldn't find figshare identiier: {path}")
- def match_request(self, request: dict, resource: Optional[ResourceResult],
- html_biblio: Optional[BiblioMetadata]) -> bool:
+ def match_request(
+ self,
+ request: dict,
+ resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata],
+ ) -> bool:
if resource and resource.terminal_url:
url = resource.terminal_url
else:
- url = request['base_url']
+ url = request["base_url"]
components = urllib.parse.urlparse(url)
- platform_domain = components.netloc.split(':')[0].lower()
+ platform_domain = components.netloc.split(":")[0].lower()
# only work with full, versioned figshare.com URLs
- if 'figshare.com' not in platform_domain:
+ if "figshare.com" not in platform_domain:
return False
try:
@@ -380,8 +411,12 @@ class FigshareHelper(FilesetPlatformHelper):
return False
- def process_request(self, request: dict, resource: Optional[ResourceResult],
- html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
+ def process_request(
+ self,
+ request: dict,
+ resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata],
+ ) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
"""
@@ -389,15 +424,16 @@ class FigshareHelper(FilesetPlatformHelper):
if resource and resource.terminal_url:
url = resource.terminal_url
else:
- url = request['base_url']
+ url = request["base_url"]
# 1. extract domain, PID, and version from URL
components = urllib.parse.urlparse(url)
- platform_domain = components.netloc.split(':')[0].lower()
+ platform_domain = components.netloc.split(":")[0].lower()
(platform_id, dataset_version) = self.parse_figshare_url_path(components.path)
assert platform_id.isdigit(), f"expected numeric: {platform_id}"
- assert dataset_version and dataset_version.isdigit(
+ assert (
+ dataset_version and dataset_version.isdigit()
), f"expected numeric: {dataset_version}"
# 1b. if we didn't get a version number from URL, fetch it from API
@@ -405,59 +441,60 @@ class FigshareHelper(FilesetPlatformHelper):
# 2. API fetch
resp = self.session.get(
- f"https://api.figshare.com/v2/articles/{platform_id}/versions/{dataset_version}")
+ f"https://api.figshare.com/v2/articles/{platform_id}/versions/{dataset_version}"
+ )
resp.raise_for_status()
obj = resp.json()
# figshare_type = obj['defined_type_name']
- if not obj['is_public']:
- raise PlatformRestrictedError(f'record not public: {platform_id} {dataset_version}')
- if obj['is_embargoed']:
+ if not obj["is_public"]:
+ raise PlatformRestrictedError(f"record not public: {platform_id} {dataset_version}")
+ if obj["is_embargoed"]:
raise PlatformRestrictedError(
f'record is embargoed: {obj.get("embargo_title")} ({platform_id} {dataset_version})'
)
manifest = []
- for row in obj['files']:
+ for row in obj["files"]:
manifest.append(
FilesetManifestFile(
- path=row['name'],
- size=row['size'],
- md5=row['computed_md5'],
+ path=row["name"],
+ size=row["size"],
+ md5=row["computed_md5"],
# NOTE: don't get: sha1, sha256, mimetype
- platform_url=row['download_url'],
- #extra=dict(),
- ))
- assert not row.get('is_link_only')
+ platform_url=row["download_url"],
+ # extra=dict(),
+ )
+ )
+ assert not row.get("is_link_only")
authors = []
- for author in obj['authors']:
- authors.append(author['full_name'])
+ for author in obj["authors"]:
+ authors.append(author["full_name"])
archiveorg_item_name = f"{platform_domain}-{platform_id}-v{dataset_version}"
archiveorg_item_meta = dict(
# TODO: collection=platform_domain,
collection="datasets",
creator=authors,
- doi=obj['doi'],
- title=obj['title'],
- date=obj['published_date'],
- source=obj['url_public_html'],
- description=obj['description'],
- license=obj['license']['url'],
- version=obj['version'],
+ doi=obj["doi"],
+ title=obj["title"],
+ date=obj["published_date"],
+ source=obj["url_public_html"],
+ description=obj["description"],
+ license=obj["license"]["url"],
+ version=obj["version"],
)
return FilesetPlatformItem(
platform_name=self.platform_name,
- platform_status='success',
+ platform_status="success",
manifest=manifest,
platform_domain=platform_domain,
platform_id=platform_id,
archiveorg_item_name=archiveorg_item_name,
archiveorg_item_meta=archiveorg_item_meta,
- web_bundle_url=
- f"https://ndownloader.figshare.com/articles/{platform_id}/versions/{dataset_version}",
+ web_bundle_url=f"https://ndownloader.figshare.com/articles/{platform_id}/versions/{dataset_version}",
# TODO: web_base_url= (for GWB downloading, in lieu of platform_url on individual files)
extra=dict(version=dataset_version),
)
@@ -466,13 +503,19 @@ class FigshareHelper(FilesetPlatformHelper):
def test_parse_figshare_url_path() -> None:
valid = {
- "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858/1":
- ("8987858", "1"),
- "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858":
- ("8987858", None),
+ "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858/1": (
+ "8987858",
+ "1",
+ ),
+ "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858": (
+ "8987858",
+ None,
+ ),
"/articles/CIBERSORT_p-value_0_05/8217188/1": ("8217188", "1"),
- "/articles/dataset/STable_1_U-Pb_geochronologic_analyses_on_samples_xls/12127176/4":
- ("12127176", "4"),
+ "/articles/dataset/STable_1_U-Pb_geochronologic_analyses_on_samples_xls/12127176/4": (
+ "12127176",
+ "4",
+ ),
}
invalid = [
@@ -493,25 +536,33 @@ def test_parse_figshare_url_path() -> None:
class ZenodoHelper(FilesetPlatformHelper):
def __init__(self):
super().__init__()
- self.platform_name = 'zenodo'
+ self.platform_name = "zenodo"
self.session = requests.Session()
- def match_request(self, request: dict, resource: Optional[ResourceResult],
- html_biblio: Optional[BiblioMetadata]) -> bool:
+ def match_request(
+ self,
+ request: dict,
+ resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata],
+ ) -> bool:
if resource and resource.terminal_url:
url = resource.terminal_url
else:
- url = request['base_url']
+ url = request["base_url"]
components = urllib.parse.urlparse(url)
- platform_domain = components.netloc.split(':')[0].lower()
- if platform_domain == 'zenodo.org' and '/record/' in components.path:
+ platform_domain = components.netloc.split(":")[0].lower()
+ if platform_domain == "zenodo.org" and "/record/" in components.path:
return True
return False
- def process_request(self, request: dict, resource: Optional[ResourceResult],
- html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
+ def process_request(
+ self,
+ request: dict,
+ resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata],
+ ) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
"""
@@ -519,7 +570,7 @@ class ZenodoHelper(FilesetPlatformHelper):
if resource and resource.terminal_url:
url = resource.terminal_url
else:
- url = request['base_url']
+ url = request["base_url"]
# TODO: also look in base_url and resource-non-terminal for ident? to
# check for work-level redirects
@@ -527,118 +578,118 @@ class ZenodoHelper(FilesetPlatformHelper):
# 1. extract identifier from URL
# eg: https://zenodo.org/record/5230255
components = urllib.parse.urlparse(url)
- platform_domain = components.netloc.split(':')[0].lower()
- if len(components.path.split('/')) < 2:
+ platform_domain = components.netloc.split(":")[0].lower()
+ if len(components.path.split("/")) < 2:
raise PlatformScopeError("Expected a complete, versioned figshare URL")
- platform_id = components.path.split('/')[2]
+ platform_id = components.path.split("/")[2]
assert platform_id.isdigit(), f"expected numeric: {platform_id}"
- if 'zenodo.org' not in platform_domain:
+ if "zenodo.org" not in platform_domain:
raise PlatformScopeError(f"unexpected zenodo.org domain: {platform_domain}")
# 2. API fetch
resp = self.session.get(f"https://zenodo.org/api/records/{platform_id}")
if resp.status_code == 410:
- raise PlatformRestrictedError('record deleted')
+ raise PlatformRestrictedError("record deleted")
resp.raise_for_status()
obj = resp.json()
- assert obj['id'] == int(platform_id)
- work_id = obj['conceptrecid']
- if work_id == obj['id']:
+ assert obj["id"] == int(platform_id)
+ work_id = obj["conceptrecid"]
+ if work_id == obj["id"]:
raise PlatformScopeError(
- "got a work-level zenodo record, not a versioned record: {work_id}")
+ "got a work-level zenodo record, not a versioned record: {work_id}"
+ )
# zenodo_type = obj['metadata']['resource_type']['type']
- if obj['metadata']['access_right'] != 'open':
+ if obj["metadata"]["access_right"] != "open":
raise PlatformRestrictedError(
"not publicly available ({obj['metadata']['access_right']}): {platform_domain} {platform_id}"
)
manifest = []
- for row in obj['files']:
+ for row in obj["files"]:
mf = FilesetManifestFile(
- path=row['key'],
- size=row['size'],
- platform_url=row['links']['self'],
- #extra=dict(),
+ path=row["key"],
+ size=row["size"],
+ platform_url=row["links"]["self"],
+ # extra=dict(),
)
- checksum = row['checksum']
+ checksum = row["checksum"]
# eg: md5:35ffcab905f8224556dba76648cb7dad
- if checksum.startswith('md5:'):
+ if checksum.startswith("md5:"):
mf.md5 = checksum[4:]
- elif checksum.startswith('sha1:'):
+ elif checksum.startswith("sha1:"):
mf.sha1 = checksum[45]
manifest.append(mf)
authors = []
- for author in obj['metadata']['creators']:
- authors.append(author['name'])
+ for author in obj["metadata"]["creators"]:
+ authors.append(author["name"])
archiveorg_item_name = f"{platform_domain}-{platform_id}"
archiveorg_item_meta = dict(
# TODO: collection=platform_domain,
collection="datasets",
creator=authors,
- doi=obj['doi'],
- title=obj['metadata']['title'],
- date=obj['metadata']['publication_date'],
- source=obj['links']['html'],
- description=obj['metadata']['description'],
- license=obj['metadata']['license']['id'],
- version=obj['revision'],
+ doi=obj["doi"],
+ title=obj["metadata"]["title"],
+ date=obj["metadata"]["publication_date"],
+ source=obj["links"]["html"],
+ description=obj["metadata"]["description"],
+ license=obj["metadata"]["license"]["id"],
+ version=obj["revision"],
# obj['metadata']['version'] is, eg, git version tag
)
return FilesetPlatformItem(
platform_name=self.platform_name,
- platform_status='success',
+ platform_status="success",
manifest=manifest,
platform_domain=platform_domain,
platform_id=platform_id,
archiveorg_item_name=archiveorg_item_name,
archiveorg_item_meta=archiveorg_item_meta,
- #web_bundle_url=f"https://ndownloader.figshare.com/articles/{platform_id}/versions/{dataset_version}",
+ # web_bundle_url=f"https://ndownloader.figshare.com/articles/{platform_id}/versions/{dataset_version}",
# TODO: web_base_url= (for GWB downloading, in lieu of platform_url on individual files)
- extra=dict(version=obj['revision']),
+ extra=dict(version=obj["revision"]),
)
class ArchiveOrgHelper(FilesetPlatformHelper):
FORMAT_TO_MIMETYPE = {
- 'BZIP': 'application/x-bzip',
- 'BZIP2': 'application/x-bzip2',
- 'ZIP': 'application/zip',
- 'GZIP': 'application/gzip',
- 'RAR': 'application/vnd.rar',
- 'TAR': 'application/x-tar',
- '7z': 'application/x-7z-compressed',
- 'HTML': 'text/html',
- 'Text': 'text/plain',
- 'PDF': 'application/pdf',
- 'CSV': 'text/csv',
- 'XML': 'application/xml',
- 'JSON': 'application/json',
-
+ "BZIP": "application/x-bzip",
+ "BZIP2": "application/x-bzip2",
+ "ZIP": "application/zip",
+ "GZIP": "application/gzip",
+ "RAR": "application/vnd.rar",
+ "TAR": "application/x-tar",
+ "7z": "application/x-7z-compressed",
+ "HTML": "text/html",
+ "Text": "text/plain",
+ "PDF": "application/pdf",
+ "CSV": "text/csv",
+ "XML": "application/xml",
+ "JSON": "application/json",
#'application/msword (.doc)', # .doc
#'application/vnd.openxmlformats-officedocument.wordprocessingml.document', # .docx
#'application/vnd.ms-excel', # .xls
#'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', # .xlsx
- 'MP3': 'audio/mpeg', # .mp3
- 'MP4': 'video/mp4', # .mp4
- 'MPEG': 'video/mpeg', # .mpeg
- 'JPEG': 'image/jpeg',
- 'GIF': 'image/gif',
- 'PNG': 'image/png',
- 'TIFF': 'image/tiff',
- 'Unknown': None,
+ "MP3": "audio/mpeg", # .mp3
+ "MP4": "video/mp4", # .mp4
+ "MPEG": "video/mpeg", # .mpeg
+ "JPEG": "image/jpeg",
+ "GIF": "image/gif",
+ "PNG": "image/png",
+ "TIFF": "image/tiff",
+ "Unknown": None,
}
def __init__(self):
super().__init__()
- self.platform_name = 'archiveorg'
+ self.platform_name = "archiveorg"
self.session = internetarchive.get_session()
@staticmethod
@@ -646,69 +697,79 @@ class ArchiveOrgHelper(FilesetPlatformHelper):
"""
Filters IA API files
"""
- if f.source != 'original':
+ if f.source != "original":
return False
for suffix in [
- '_meta.sqlite',
- '_archive.torrent',
- '_itemimage.jpg',
- '_meta.xml',
- '_thumb.png',
- '_files.xml',
+ "_meta.sqlite",
+ "_archive.torrent",
+ "_itemimage.jpg",
+ "_meta.xml",
+ "_thumb.png",
+ "_files.xml",
]:
if f.name == item_name + suffix or f.name == item_name.lower() + suffix:
return False
- if f.name.startswith('_'):
+ if f.name.startswith("_"):
return False
- if item_name.startswith('academictorrents_'):
+ if item_name.startswith("academictorrents_"):
for suffix in [
- '_academictorrents.torrent', '_academictorrents_torrent.txt', '.bib'
+ "_academictorrents.torrent",
+ "_academictorrents_torrent.txt",
+ ".bib",
]:
if f.name == item_name + suffix:
return False
return True
- def match_request(self, request: dict, resource: Optional[ResourceResult],
- html_biblio: Optional[BiblioMetadata]) -> bool:
+ def match_request(
+ self,
+ request: dict,
+ resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata],
+ ) -> bool:
if resource and resource.terminal_url:
url = resource.terminal_url
else:
- url = request['base_url']
+ url = request["base_url"]
patterns = [
- '://archive.org/details/',
- '://archive.org/download/',
+ "://archive.org/details/",
+ "://archive.org/download/",
]
for p in patterns:
if p in url:
return True
return False
- def process_request(self, request: dict, resource: Optional[ResourceResult],
- html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
+ def process_request(
+ self,
+ request: dict,
+ resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata],
+ ) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
"""
- base_url_split = request['base_url'].split('/')
- #print(base_url_split, file=sys.stderr)
+ base_url_split = request["base_url"].split("/")
+ # print(base_url_split, file=sys.stderr)
assert len(base_url_split) in [5, 6]
- assert base_url_split[0] in ['http:', 'https:']
- assert base_url_split[2] == 'archive.org'
- assert base_url_split[3] in ['details', 'download']
+ assert base_url_split[0] in ["http:", "https:"]
+ assert base_url_split[2] == "archive.org"
+ assert base_url_split[3] in ["details", "download"]
item_name = base_url_split[4]
if len(base_url_split) == 6 and base_url_split[5]:
raise PlatformScopeError(
"got an archive.org file path, not download/details page; individual files not handled yet"
)
- #print(f" archiveorg processing item={item_name}", file=sys.stderr)
+ # print(f" archiveorg processing item={item_name}", file=sys.stderr)
item = self.session.get_item(item_name)
item_name = item.identifier
- item_collection = item.metadata['collection']
+ item_collection = item.metadata["collection"]
if type(item_collection) == list:
item_collection = item_collection[0]
- assert item.metadata['mediatype'] not in ['collection', 'web']
+ assert item.metadata["mediatype"] not in ["collection", "web"]
item_files = item.get_files(on_the_fly=False)
item_files = [f for f in item_files if self.want_item_file(f, item_name)]
manifest = []
@@ -727,9 +788,9 @@ class ArchiveOrgHelper(FilesetPlatformHelper):
return FilesetPlatformItem(
platform_name=self.platform_name,
- platform_status='success',
+ platform_status="success",
manifest=manifest,
- platform_domain='archive.org',
+ platform_domain="archive.org",
platform_id=item_name,
archiveorg_item_name=item_name,
archiveorg_meta=dict(collection=item_collection),
diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py
index 9d3bae3..6dc77f9 100644
--- a/python/sandcrawler/fileset_strategies.py
+++ b/python/sandcrawler/fileset_strategies.py
@@ -5,15 +5,19 @@ from typing import Optional
import internetarchive
-from sandcrawler.fileset_types import (ArchiveStrategyResult, FilesetPlatformItem,
- IngestStrategy, PlatformScopeError)
+from sandcrawler.fileset_types import (
+ ArchiveStrategyResult,
+ FilesetPlatformItem,
+ IngestStrategy,
+ PlatformScopeError,
+)
from sandcrawler.ia import SavePageNowClient, WaybackClient, fix_transfer_encoding
from sandcrawler.misc import gen_file_metadata, gen_file_metadata_path, sanitize_fs_path
-class FilesetIngestStrategy():
+class FilesetIngestStrategy:
def __init__(self):
- #self.ingest_strategy = 'unknown'
+ # self.ingest_strategy = 'unknown'
self.success_status = "success"
def check_existing(self, item: FilesetPlatformItem) -> Optional[ArchiveStrategyResult]:
@@ -29,8 +33,8 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
self.ingest_strategy = IngestStrategy.ArchiveorgFileset
# TODO: enable cleanup when confident (eg, safe path parsing)
- self.skip_cleanup_local_files = kwargs.get('skip_cleanup_local_files', True)
- self.working_dir = os.environ.get('SANDCRAWLER_WORKING_DIR', '/tmp/sandcrawler/')
+ self.skip_cleanup_local_files = kwargs.get("skip_cleanup_local_files", True)
+ self.working_dir = os.environ.get("SANDCRAWLER_WORKING_DIR", "/tmp/sandcrawler/")
try:
os.mkdir(self.working_dir)
except FileExistsError:
@@ -53,23 +57,29 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
found = False
for existing in item_files:
if existing.name == wanted.path:
- if ((existing.sha1 and existing.sha1 == wanted.sha1) or
- (existing.md5 and existing.md5 == wanted.md5)
- ) and existing.name == wanted.path and existing.size == wanted.size:
+ if (
+ (
+ (existing.sha1 and existing.sha1 == wanted.sha1)
+ or (existing.md5 and existing.md5 == wanted.md5)
+ )
+ and existing.name == wanted.path
+ and existing.size == wanted.size
+ ):
found = True
- wanted.status = 'exists'
+ wanted.status = "exists"
break
else:
- wanted.status = 'mismatch-existing'
+ wanted.status = "mismatch-existing"
break
if not found:
print(
f" item exists ({item.archiveorg_item_name}) but didn't find at least one file: {wanted.path}",
- file=sys.stderr)
+ file=sys.stderr,
+ )
return None
return ArchiveStrategyResult(
ingest_strategy=self.ingest_strategy,
- status='success-existing',
+ status="success-existing",
manifest=item.manifest,
)
@@ -81,12 +91,12 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
if existing:
return existing
- if item.platform_name == 'archiveorg':
+ if item.platform_name == "archiveorg":
raise PlatformScopeError("should't download archive.org into itself")
local_dir = self.working_dir + item.archiveorg_item_name
- assert local_dir.startswith('/')
- assert local_dir.count('/') > 2
+ assert local_dir.startswith("/")
+ assert local_dir.count("/") > 2
try:
os.mkdir(local_dir)
except FileExistsError:
@@ -96,71 +106,80 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
assert item.manifest
for m in item.manifest:
if m.path != sanitize_fs_path(m.path):
- m.status = 'unsafe-path'
+ m.status = "unsafe-path"
continue
- local_path = local_dir + '/' + m.path
+ local_path = local_dir + "/" + m.path
assert m.platform_url
if not os.path.exists(local_path):
print(f" downloading {m.path}", file=sys.stderr)
- with self.ia_session.get(m.platform_url, stream=True,
- allow_redirects=True) as r:
+ with self.ia_session.get(
+ m.platform_url, stream=True, allow_redirects=True
+ ) as r:
r.raise_for_status()
- with open(local_path + '.partial', 'wb') as f:
+ with open(local_path + ".partial", "wb") as f:
for chunk in r.iter_content(chunk_size=256 * 1024):
f.write(chunk)
- os.rename(local_path + '.partial', local_path)
- m.status = 'downloaded-local'
+ os.rename(local_path + ".partial", local_path)
+ m.status = "downloaded-local"
else:
- m.status = 'exists-local'
+ m.status = "exists-local"
print(f" verifying {m.path}", file=sys.stderr)
file_meta = gen_file_metadata_path(local_path, allow_empty=True)
- assert file_meta[
- 'size_bytes'] == m.size, f"expected: {m.size} found: {file_meta['size_bytes']}"
+ assert (
+ file_meta["size_bytes"] == m.size
+ ), f"expected: {m.size} found: {file_meta['size_bytes']}"
if m.sha1:
- assert file_meta['sha1hex'] == m.sha1
+ assert file_meta["sha1hex"] == m.sha1
else:
- m.sha1 = file_meta['sha1hex']
+ m.sha1 = file_meta["sha1hex"]
if m.sha256:
- assert file_meta['sha256hex'] == m.sha256
+ assert file_meta["sha256hex"] == m.sha256
else:
- m.sha256 = file_meta['sha256hex']
+ m.sha256 = file_meta["sha256hex"]
if m.md5:
- assert file_meta['md5hex'] == m.md5
+ assert file_meta["md5hex"] == m.md5
else:
- m.md5 = file_meta['md5hex']
+ m.md5 = file_meta["md5hex"]
if m.mimetype:
# 'magic' isn't good and parsing more detailed text file formats like text/csv
- if file_meta['mimetype'] != m.mimetype and file_meta['mimetype'] != 'text/plain':
+ if (
+ file_meta["mimetype"] != m.mimetype
+ and file_meta["mimetype"] != "text/plain"
+ ):
# these 'tab-separated-values' from dataverse are just noise, don't log them
- if m.mimetype != 'text/tab-separated-values':
+ if m.mimetype != "text/tab-separated-values":
print(
f" WARN: mimetype mismatch: expected {m.mimetype}, found {file_meta['mimetype']}",
- file=sys.stderr)
- m.mimetype = file_meta['mimetype']
+ file=sys.stderr,
+ )
+ m.mimetype = file_meta["mimetype"]
else:
- m.mimetype = file_meta['mimetype']
- m.status = 'verified-local'
+ m.mimetype = file_meta["mimetype"]
+ m.status = "verified-local"
# 2. upload all files, with metadata
- assert item.archiveorg_item_meta and item.archiveorg_item_meta['collection']
+ assert item.archiveorg_item_meta and item.archiveorg_item_meta["collection"]
item_files = []
for m in item.manifest:
- local_path = local_dir + '/' + m.path
- item_files.append({
- 'name': local_path,
- 'remote_name': m.path,
- })
+ local_path = local_dir + "/" + m.path
+ item_files.append(
+ {
+ "name": local_path,
+ "remote_name": m.path,
+ }
+ )
print(
f" uploading all files to {item.archiveorg_item_name} under {item.archiveorg_item_meta.get('collection')}...",
- file=sys.stderr)
+ file=sys.stderr,
+ )
internetarchive.upload(
item.archiveorg_item_name,
files=item_files,
@@ -171,7 +190,7 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
)
for m in item.manifest:
- m.status = 'success'
+ m.status = "success"
# 4. delete local directory
if not self.skip_cleanup_local_files:
@@ -191,6 +210,7 @@ class ArchiveorgFileStrategy(ArchiveorgFilesetStrategy):
ArchiveorgFilesetStrategy currently works fine with individual files. Just
need to over-ride the ingest_strategy name.
"""
+
def __init__(self):
super().__init__()
self.ingest_strategy = IngestStrategy.ArchiveorgFileset
@@ -204,7 +224,8 @@ class WebFilesetStrategy(FilesetIngestStrategy):
self.wayback_client = WaybackClient()
self.try_spn2 = True
self.spn_client = SavePageNowClient(
- spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0))
+ spn_cdx_retry_sec=kwargs.get("spn_cdx_retry_sec", 9.0)
+ )
self.max_spn_manifest = 20
def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult:
@@ -222,25 +243,31 @@ class WebFilesetStrategy(FilesetIngestStrategy):
fetch_url = m.platform_url
if not fetch_url:
raise NotImplementedError(
- "require 'platform_url' for each file when doing Web fetching")
+ "require 'platform_url' for each file when doing Web fetching"
+ )
via = "wayback"
resource = self.wayback_client.lookup_resource(fetch_url, m.mimetype)
- if self.try_spn2 and (resource is None or
- (resource and resource.status == 'no-capture')):
+ if self.try_spn2 and (
+ resource is None or (resource and resource.status == "no-capture")
+ ):
if len(item.manifest) > self.max_spn_manifest:
- m.status = 'too-much-spn'
+ m.status = "too-much-spn"
continue
via = "spn2"
- resource = self.spn_client.crawl_resource(fetch_url,
- self.wayback_client,
- force_simple_get=True)
-
- print("[FETCH {:>6}] {} {}".format(via, (resource and resource.status),
- (resource and resource.terminal_url)
- or fetch_url),
- file=sys.stderr)
+ resource = self.spn_client.crawl_resource(
+ fetch_url, self.wayback_client, force_simple_get=True
+ )
+
+ print(
+ "[FETCH {:>6}] {} {}".format(
+ via,
+ (resource and resource.status),
+ (resource and resource.terminal_url) or fetch_url,
+ ),
+ file=sys.stderr,
+ )
m.terminal_url = resource.terminal_url
m.terminal_dt = resource.terminal_dt
@@ -248,7 +275,7 @@ class WebFilesetStrategy(FilesetIngestStrategy):
if self.ingest_strategy == "web-file":
file_resource = resource
- if resource.status != 'success':
+ if resource.status != "success":
continue
else:
assert resource.terminal_status_code == 200
@@ -259,24 +286,26 @@ class WebFilesetStrategy(FilesetIngestStrategy):
if self.ingest_strategy == "web-file":
file_file_meta = file_meta
- if file_meta['size_bytes'] != m.size or (m.md5 and m.md5 != file_meta['md5hex']
- ) or (m.sha1
- and m.sha1 != file_meta['sha1hex']):
- m.status = 'mismatch'
+ if (
+ file_meta["size_bytes"] != m.size
+ or (m.md5 and m.md5 != file_meta["md5hex"])
+ or (m.sha1 and m.sha1 != file_meta["sha1hex"])
+ ):
+ m.status = "mismatch"
continue
- m.md5 = m.md5 or file_meta['md5hex']
- m.sha1 = m.sha1 or file_meta['md5hex']
- m.sha256 = m.sha256 or file_meta['sha256hex']
- m.mimetype = m.mimetype or file_meta['mimetype']
+ m.md5 = m.md5 or file_meta["md5hex"]
+ m.sha1 = m.sha1 or file_meta["md5hex"]
+ m.sha256 = m.sha256 or file_meta["sha256hex"]
+ m.mimetype = m.mimetype or file_meta["mimetype"]
overall_status = self.success_status
for m in item.manifest:
- if m.status != 'success':
- overall_status = m.status or 'not-processed'
+ if m.status != "success":
+ overall_status = m.status or "not-processed"
break
if not item.manifest:
- overall_status = 'empty-manifest'
+ overall_status = "empty-manifest"
result = ArchiveStrategyResult(
ingest_strategy=self.ingest_strategy,
diff --git a/python/sandcrawler/fileset_types.py b/python/sandcrawler/fileset_types.py
index f543ede..3398833 100644
--- a/python/sandcrawler/fileset_types.py
+++ b/python/sandcrawler/fileset_types.py
@@ -62,6 +62,7 @@ class PlatformScopeError(Exception):
- a 'latest version' record, when the platform has version-specific records
- a single file within a dataset for a platform which has file-level identifiers
"""
+
pass
@@ -69,4 +70,5 @@ class PlatformRestrictedError(Exception):
"""
When datasets are not publicly available on a platform (yet)
"""
+
pass
diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py
index 67aca17..26918f6 100644
--- a/python/sandcrawler/grobid.py
+++ b/python/sandcrawler/grobid.py
@@ -12,11 +12,11 @@ from .workers import SandcrawlerFetchWorker, SandcrawlerWorker
class GrobidClient(object):
def __init__(self, host_url: str = "http://grobid.qa.fatcat.wiki", **kwargs):
self.host_url = host_url
- self.consolidate_mode = int(kwargs.get('consolidate_mode', 0))
+ self.consolidate_mode = int(kwargs.get("consolidate_mode", 0))
- def process_fulltext(self,
- blob: bytes,
- consolidate_mode: Optional[int] = None) -> Dict[str, Any]:
+ def process_fulltext(
+ self, blob: bytes, consolidate_mode: Optional[int] = None
+ ) -> Dict[str, Any]:
"""
Returns dict with keys:
- status_code
@@ -36,72 +36,75 @@ class GrobidClient(object):
grobid_response = requests.post(
self.host_url + "/api/processFulltextDocument",
files={
- 'input': blob,
- 'consolidateHeader': consolidate_mode,
- 'consolidateCitations': 0, # too expensive for now
- 'includeRawCitations': 1,
+ "input": blob,
+ "consolidateHeader": consolidate_mode,
+ "consolidateCitations": 0, # too expensive for now
+ "includeRawCitations": 1,
},
timeout=180.0,
)
except requests.Timeout:
return {
- 'status': 'error-timeout',
- 'status_code': -4, # heritrix3 "HTTP timeout" code
- 'error_msg': 'GROBID request (HTTP POST) timeout',
+ "status": "error-timeout",
+ "status_code": -4, # heritrix3 "HTTP timeout" code
+ "error_msg": "GROBID request (HTTP POST) timeout",
}
info: Dict[str, Any] = dict(status_code=grobid_response.status_code)
if grobid_response.status_code == 200:
- info['status'] = 'success'
- info['tei_xml'] = grobid_response.text
- if len(info['tei_xml']) > 12000000:
+ info["status"] = "success"
+ info["tei_xml"] = grobid_response.text
+ if len(info["tei_xml"]) > 12000000:
# XML is larger than Kafka message size, and much larger than
# an article in general; bail out
- info['status'] = 'error'
- info['error_msg'] = "response XML too large: {} bytes".format(
- len(info['tei_xml']))
- info.pop('tei_xml')
+ info["status"] = "error"
+ info["error_msg"] = "response XML too large: {} bytes".format(
+ len(info["tei_xml"])
+ )
+ info.pop("tei_xml")
else:
# response.text is .content decoded as utf-8
- info['status'] = 'error'
- info['error_msg'] = grobid_response.text[:10000]
+ info["status"] = "error"
+ info["error_msg"] = grobid_response.text[:10000]
return info
def metadata(self, result: Dict[str, Any]) -> Optional[Dict[str, Any]]:
- if result['status'] != 'success':
+ if result["status"] != "success":
return None
- tei_json = teixml2json(result['tei_xml'], encumbered=False)
+ tei_json = teixml2json(result["tei_xml"], encumbered=False)
meta = dict()
biblio = dict()
for k in (
- 'title',
- 'authors',
- 'journal',
- 'date',
- 'doi',
+ "title",
+ "authors",
+ "journal",
+ "date",
+ "doi",
):
if tei_json.get(k):
biblio[k] = tei_json[k]
- meta['biblio'] = biblio
- for k in ('grobid_version', 'grobid_timestamp', 'fatcat_release', 'language_code'):
+ meta["biblio"] = biblio
+ for k in ("grobid_version", "grobid_timestamp", "fatcat_release", "language_code"):
if tei_json.get(k):
meta[k] = tei_json[k]
return meta
class GrobidWorker(SandcrawlerFetchWorker):
- def __init__(self,
- grobid_client: GrobidClient,
- wayback_client: Optional[WaybackClient] = None,
- sink: Optional[SandcrawlerWorker] = None,
- **kwargs):
+ def __init__(
+ self,
+ grobid_client: GrobidClient,
+ wayback_client: Optional[WaybackClient] = None,
+ sink: Optional[SandcrawlerWorker] = None,
+ **kwargs
+ ):
super().__init__(wayback_client=wayback_client)
self.grobid_client = grobid_client
self.sink = sink
self.consolidate_mode = 0
def timeout_response(self, task: Any) -> Any:
- default_key = task['sha1hex']
+ default_key = task["sha1hex"]
return dict(
status="error-timeout",
error_msg="internal GROBID worker timeout",
@@ -111,16 +114,17 @@ class GrobidWorker(SandcrawlerFetchWorker):
def process(self, record: Any, key: Optional[str] = None) -> Any:
fetch_result = self.fetch_blob(record)
- if fetch_result['status'] != 'success':
+ if fetch_result["status"] != "success":
return fetch_result
- blob: bytes = fetch_result['blob']
+ blob: bytes = fetch_result["blob"]
assert blob and isinstance(blob, bytes)
- result = self.grobid_client.process_fulltext(blob,
- consolidate_mode=self.consolidate_mode)
- result['file_meta'] = gen_file_metadata(blob)
- result['source'] = record
- result['key'] = result['file_meta']['sha1hex']
+ result = self.grobid_client.process_fulltext(
+ blob, consolidate_mode=self.consolidate_mode
+ )
+ result["file_meta"] = gen_file_metadata(blob)
+ result["source"] = record
+ result["key"] = result["file_meta"]["sha1hex"]
return result
@@ -129,10 +133,10 @@ class GrobidBlobWorker(SandcrawlerWorker):
This is sort of like GrobidWorker, except it receives blobs directly,
instead of fetching blobs from some remote store.
"""
- def __init__(self,
- grobid_client: GrobidClient,
- sink: Optional[SandcrawlerWorker] = None,
- **kwargs):
+
+ def __init__(
+ self, grobid_client: GrobidClient, sink: Optional[SandcrawlerWorker] = None, **kwargs
+ ):
super().__init__()
self.grobid_client = grobid_client
self.sink = sink
@@ -141,8 +145,9 @@ class GrobidBlobWorker(SandcrawlerWorker):
def process(self, blob: Any, key: Optional[str] = None) -> Any:
if not blob:
return None
- result = self.grobid_client.process_fulltext(blob,
- consolidate_mode=self.consolidate_mode)
- result['file_meta'] = gen_file_metadata(blob)
- result['key'] = result['file_meta']['sha1hex']
+ result = self.grobid_client.process_fulltext(
+ blob, consolidate_mode=self.consolidate_mode
+ )
+ result["file_meta"] = gen_file_metadata(blob)
+ result["key"] = result["file_meta"]["sha1hex"]
return result
diff --git a/python/sandcrawler/html.py b/python/sandcrawler/html.py
index 4d36573..5fba963 100644
--- a/python/sandcrawler/html.py
+++ b/python/sandcrawler/html.py
@@ -7,7 +7,8 @@ from typing import Dict
from bs4 import BeautifulSoup
RESEARCHSQUARE_REGEX = re.compile(
- r'"url":"(https://assets.researchsquare.com/files/.{1,50}/v\d+/Manuscript.pdf)"')
+ r'"url":"(https://assets.researchsquare.com/files/.{1,50}/v\d+/Manuscript.pdf)"'
+)
IEEEXPLORE_REGEX = re.compile(r'"pdfPath":"(/.*?\.pdf)"')
OVID_JOURNAL_URL_REGEX = re.compile(r'journalURL = "(http.*)";')
SCIENCEDIRECT_BOUNCE_URL_REGEX = re.compile(r"window.location = '(http.*)';")
@@ -21,9 +22,9 @@ def extract_fulltext_url(html_url: str, html_body: bytes) -> Dict[str, str]:
On error, or if fails to extract a URL, returns an empty dict.
"""
- host_prefix = '/'.join(html_url.split('/')[:3])
+ host_prefix = "/".join(html_url.split("/")[:3])
try:
- soup = BeautifulSoup(html_body, 'html.parser')
+ soup = BeautifulSoup(html_body, "html.parser")
except TypeError as te:
print(f"{te} (url={html_url})", file=sys.stderr)
return dict()
@@ -34,80 +35,86 @@ def extract_fulltext_url(html_url: str, html_body: bytes) -> Dict[str, str]:
### General Tricks ###
# highwire-style meta tag
- meta = soup.find('meta', attrs={"name": "citation_pdf_url"})
+ meta = soup.find("meta", attrs={"name": "citation_pdf_url"})
if not meta:
- meta = soup.find('meta', attrs={"name": "bepress_citation_pdf_url"})
+ meta = soup.find("meta", attrs={"name": "bepress_citation_pdf_url"})
if not meta:
- meta = soup.find('meta', attrs={"name": "wkhealth_pdf_url"})
+ meta = soup.find("meta", attrs={"name": "wkhealth_pdf_url"})
if not meta:
# researchgate does this; maybe others also?
- meta = soup.find('meta', attrs={"property": "citation_pdf_url"})
+ meta = soup.find("meta", attrs={"property": "citation_pdf_url"})
if not meta:
- meta = soup.find('meta', attrs={"name": "eprints.document_url"})
+ meta = soup.find("meta", attrs={"name": "eprints.document_url"})
# if tag is only partially populated
- if meta and not meta.get('content'):
+ if meta and not meta.get("content"):
meta = None
# wiley has a weird almost-blank page we don't want to loop on
if meta and "://onlinelibrary.wiley.com/doi/pdf/" not in html_url:
- url = meta['content'].strip()
- if '://doi.org/' in url:
+ url = meta["content"].strip()
+ if "://doi.org/" in url:
print(f"\tdoi.org in citation_pdf_url (loop?): {url}", file=sys.stderr)
- elif url.startswith('/'):
+ elif url.startswith("/"):
if host_prefix + url == html_url:
print("\tavoiding citation_pdf_url link-loop", file=sys.stderr)
else:
- return dict(pdf_url=host_prefix + url, technique='citation_pdf_url')
- elif url.startswith('http'):
+ return dict(pdf_url=host_prefix + url, technique="citation_pdf_url")
+ elif url.startswith("http"):
if url == html_url:
print("\tavoiding citation_pdf_url link-loop", file=sys.stderr)
else:
- return dict(pdf_url=url, technique='citation_pdf_url')
+ return dict(pdf_url=url, technique="citation_pdf_url")
else:
print("\tmalformed citation_pdf_url? {}".format(url), file=sys.stderr)
- meta = soup.find('meta', attrs={"name": "generator"})
+ meta = soup.find("meta", attrs={"name": "generator"})
meta_generator = None
- if meta and meta.get('content'):
- meta_generator = meta['content'].strip()
+ if meta and meta.get("content"):
+ meta_generator = meta["content"].strip()
### Publisher/Platform Specific ###
# research square (researchsquare.com)
- if 'researchsquare.com/article/' in html_url:
+ if "researchsquare.com/article/" in html_url:
# JSON in body with a field like:
# "url":"https://assets.researchsquare.com/files/4a57970e-b002-4608-b507-b95967649483/v2/Manuscript.pdf"
- m = RESEARCHSQUARE_REGEX.search(html_body.decode('utf-8'))
+ m = RESEARCHSQUARE_REGEX.search(html_body.decode("utf-8"))
if m:
url = m.group(1)
assert len(url) < 4096
- return dict(release_stage="manuscript", pdf_url=url, technique='publisher')
+ return dict(release_stage="manuscript", pdf_url=url, technique="publisher")
# elseiver linking hub
# https://linkinghub.elsevier.com/retrieve/pii/S1569199319308975
- if '://linkinghub.elsevier.com/retrieve/pii/' in html_url:
+ if "://linkinghub.elsevier.com/retrieve/pii/" in html_url:
# <input type="hidden" name="redirectURL" value="http%3A%2F%2Fcysticfibrosisjournal.com%2Fretrieve%2Fpii%2FS1569199319308975" id="redirectURL"/>
redirect = soup.find("input", attrs={"name": "redirectURL"})
if redirect:
- url = redirect['value'].strip()
- if 'http' in url:
+ url = redirect["value"].strip()
+ if "http" in url:
url = urllib.parse.unquote(url)
# drop any the query parameter
- url = url.split('?via')[0]
+ url = url.split("?via")[0]
return dict(next_url=url, technique="elsevier-linkinghub")
# sciencedirect PDF URL extract
# https://www.sciencedirect.com/science/article/pii/S0169204621000670
- if 'sciencedirect.com/science/article/pii/' in html_url and not html_url.endswith(".pdf"):
+ if "sciencedirect.com/science/article/pii/" in html_url and not html_url.endswith(".pdf"):
json_tag = soup.find("script", attrs={"type": "application/json", "data-iso-key": "_0"})
url = None
if json_tag:
try:
json_text = json_tag.string
json_meta = json.loads(json_text)
- pdf_meta = json_meta['article']['pdfDownload']['urlMetadata']
+ pdf_meta = json_meta["article"]["pdfDownload"]["urlMetadata"]
# https://www.sciencedirect.com/science/article/pii/S0169204621000670/pdfft?md5=c4a83d06b334b627ded74cf9423bfa56&pid=1-s2.0-S0169204621000670-main.pdf
- url = html_url + pdf_meta['pdfExtension'] + "?md5=" + pdf_meta['queryParams'][
- 'md5'] + "&pid=" + pdf_meta['queryParams']['pid']
+ url = (
+ html_url
+ + pdf_meta["pdfExtension"]
+ + "?md5="
+ + pdf_meta["queryParams"]["md5"]
+ + "&pid="
+ + pdf_meta["queryParams"]["pid"]
+ )
except (KeyError, TypeError, json.JSONDecodeError):
pass
if url:
@@ -115,9 +122,9 @@ def extract_fulltext_url(html_url: str, html_body: bytes) -> Dict[str, str]:
# sciencedirect PDF bounce page
# https://www.sciencedirect.com/science/article/pii/S2590109519300424/pdfft?md5=854f43a44de186eb58674b8e20631691&pid=1-s2.0-S2590109519300424-main.pdf
- if '://www.sciencedirect.com/' in html_url and html_url.endswith(".pdf"):
+ if "://www.sciencedirect.com/" in html_url and html_url.endswith(".pdf"):
# window.location = 'https://pdf.sciencedirectassets.com/320270/AIP/1-s2.0-S2590109519300424/main.pdf?X-Amz-Security-Token=[...]&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20200110T210936Z&X-Amz-SignedHeaders=host&X-Amz-Expires=300&X-Amz-Credential=ASIAQ3PHCVTY23CMDBNC%2F20200110%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Signature=[...]&hash=[...]&host=[...]&pii=S2590109519300424&tid=spdf-74468ebd-6be6-43ac-b294-ced86e8eea58&sid=[...]&type=client';
- m = SCIENCEDIRECT_BOUNCE_URL_REGEX.search(html_body.decode('utf-8'))
+ m = SCIENCEDIRECT_BOUNCE_URL_REGEX.search(html_body.decode("utf-8"))
if m:
url = m.group(1)
assert len(url) < 4000
@@ -125,34 +132,34 @@ def extract_fulltext_url(html_url: str, html_body: bytes) -> Dict[str, str]:
# ieeexplore.ieee.org
# https://ieeexplore.ieee.org/document/8730316
- if '://ieeexplore.ieee.org/document/' in html_url:
+ if "://ieeexplore.ieee.org/document/" in html_url:
# JSON in body with a field like:
# "pdfPath":"/iel7/6287639/8600701/08730316.pdf",
- m = IEEEXPLORE_REGEX.search(html_body.decode('utf-8'))
+ m = IEEEXPLORE_REGEX.search(html_body.decode("utf-8"))
if m:
url = m.group(1)
assert len(url) < 4096
- return dict(release_stage="published",
- pdf_url=host_prefix + url,
- technique="ieeexplore")
+ return dict(
+ release_stage="published", pdf_url=host_prefix + url, technique="ieeexplore"
+ )
# https://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=8730313
- if '://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber' in html_url:
+ if "://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber" in html_url:
# HTML iframe like:
# <iframe src="http://web.archive.org/web/20191026011528if_/https://ieeexplore.ieee.org/ielx7/6287639/8600701/08730313.pdf?tp=&amp;arnumber=8730313&amp;isnumber=8600701&amp;ref=" frameborder="0"></iframe>
iframe = soup.find("iframe")
- if iframe and '.pdf' in iframe['src']:
- return dict(pdf_url=iframe['src'], technique="iframe")
+ if iframe and ".pdf" in iframe["src"]:
+ return dict(pdf_url=iframe["src"], technique="iframe")
# https://insights.ovid.com/crossref?an=00042307-202001000-00013
# Ovid is some kind of landing page bounce portal tracking run-around.
# Can extract actual journal URL from javascript blob in the HTML
- if '://insights.ovid.com/crossref' in html_url:
+ if "://insights.ovid.com/crossref" in html_url:
# var journalURL = "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689";
- m = OVID_JOURNAL_URL_REGEX.search(html_body.decode('utf-8'))
+ m = OVID_JOURNAL_URL_REGEX.search(html_body.decode("utf-8"))
if m:
url = m.group(1)
assert len(url) < 4096
- return dict(next_url=url, technique='ovid')
+ return dict(next_url=url, technique="ovid")
# osf.io
# https://osf.io/8phvx/
@@ -160,41 +167,44 @@ def extract_fulltext_url(html_url: str, html_body: bytes) -> Dict[str, str]:
# wow, they ship total javascript crud! going to just guess download URL
# based on URL for now. Maybe content type header would help?
OSF_DOMAINS = [
- '://osf.io/',
- '://biohackrxiv.org/',
- '://psyarxiv.com/',
- '://arabixiv.org/',
- '://engrxiv.org/',
- '://edarxiv.org//',
- '://ecsarxiv.org/',
- '://ecoevorxiv.org/',
- '://frenxiv.org/',
- '://indiarxiv.org/',
- '://mindrxiv.org/',
- '://mediarxiv.org/',
- '://paleorxiv.org/',
- '://thesiscommons.org/',
+ "://osf.io/",
+ "://biohackrxiv.org/",
+ "://psyarxiv.com/",
+ "://arabixiv.org/",
+ "://engrxiv.org/",
+ "://edarxiv.org//",
+ "://ecsarxiv.org/",
+ "://ecoevorxiv.org/",
+ "://frenxiv.org/",
+ "://indiarxiv.org/",
+ "://mindrxiv.org/",
+ "://mediarxiv.org/",
+ "://paleorxiv.org/",
+ "://thesiscommons.org/",
]
for domain in OSF_DOMAINS:
- if domain in html_url and (len(html_url.split('/')) in [4, 5] or '/preprints/'
- in html_url) and '/download' not in html_url:
+ if (
+ domain in html_url
+ and (len(html_url.split("/")) in [4, 5] or "/preprints/" in html_url)
+ and "/download" not in html_url
+ ):
if not html_url.endswith("/"):
next_url = html_url + "/download"
else:
next_url = html_url + "download"
- return dict(next_url=next_url, technique='osf-by-url')
+ return dict(next_url=next_url, technique="osf-by-url")
# wiley
# https://onlinelibrary.wiley.com/doi/pdf/10.1111/1467-923X.12787
if "://onlinelibrary.wiley.com/doi/pdf/" in html_url:
if b"/doi/pdfdirect/" in html_body:
- next_url = html_url.replace('/doi/pdf/', '/doi/pdfdirect/')
- return dict(next_url=next_url, technique='wiley-pdfdirect')
+ next_url = html_url.replace("/doi/pdf/", "/doi/pdfdirect/")
+ return dict(next_url=next_url, technique="wiley-pdfdirect")
# arxiv abstract pages
if "://arxiv.org/abs/" in html_url:
url = html_url.replace("/abs/", "/pdf/")
- return dict(pdf_url=url, technique='arxiv-url')
+ return dict(pdf_url=url, technique="arxiv-url")
# american archivist (OA)
# https://americanarchivist.org/doi/abs/10.17723/aarc.62.2.j475270470145630
@@ -202,28 +212,28 @@ def extract_fulltext_url(html_url: str, html_body: bytes) -> Dict[str, str]:
# use a more aggressive direct guess to avoid rate-limiting...
if "/doi/10." in html_url:
url = html_url.replace("/doi/10.", "/doi/pdf/10.")
- return dict(pdf_url=url, technique='archivist-url')
+ return dict(pdf_url=url, technique="archivist-url")
# <a href="/doi/pdf/10.17723/aarc.62.2.j475270470145630" target="_blank">
- hrefs = soup.find_all('a', attrs={"target": "_blank"})
+ hrefs = soup.find_all("a", attrs={"target": "_blank"})
for href in hrefs:
- url = href['href'].strip()
+ url = href["href"].strip()
if "/doi/pdf/" in url:
- if url.startswith('http'):
- return dict(pdf_url=url, technique='publisher-href')
- elif url.startswith('/'):
- return dict(pdf_url=host_prefix + url, technique='publisher-href')
+ if url.startswith("http"):
+ return dict(pdf_url=url, technique="publisher-href")
+ elif url.startswith("/"):
+ return dict(pdf_url=host_prefix + url, technique="publisher-href")
# protocols.io
# https://www.protocols.io/view/flow-cytometry-protocol-mgdc3s6
if "://www.protocols.io/view/" in html_url and not html_url.endswith(".pdf"):
url = html_url + ".pdf"
- return dict(pdf_url=url, technique='protocolsio-url')
+ return dict(pdf_url=url, technique="protocolsio-url")
# degruyter.com
# https://www.degruyter.com/view/books/9783486594621/9783486594621-009/9783486594621-009.xml
if "://www.degruyter.com/view/" in html_url and html_url.endswith(".xml"):
- url = html_url.replace('/view/', '/downloadpdf/').replace('.xml', '.pdf')
- return dict(pdf_url=url, technique='degruyter-url')
+ url = html_url.replace("/view/", "/downloadpdf/").replace(".xml", ".pdf")
+ return dict(pdf_url=url, technique="degruyter-url")
# journals.lww.com (Wolters Kluwer)
# https://journals.lww.com/spinejournal/Abstract/publishahead/Making_the_Most_of_Systematic_Reviews_and.94318.aspx
@@ -231,31 +241,32 @@ def extract_fulltext_url(html_url: str, html_body: bytes) -> Dict[str, str]:
# we never get the content.
if "://journals.lww.com/" in html_url and False:
# data-pdf-url="https://pdfs.journals.lww.com/spinejournal/9000/00000/Making_the_Most_of_Systematic_Reviews_and.94318.pdf?token=method|ExpireAbsolute;source|Journals;ttl|1582413672903;payload|mY8D3u1TCCsNvP5E421JYK6N6XICDamxByyYpaNzk7FKjTaa1Yz22MivkHZqjGP4kdS2v0J76WGAnHACH69s21Csk0OpQi3YbjEMdSoz2UhVybFqQxA7lKwSUlA502zQZr96TQRwhVlocEp/sJ586aVbcBFlltKNKo+tbuMfL73hiPqJliudqs17cHeLcLbV/CqjlP3IO0jGHlHQtJWcICDdAyGJMnpi6RlbEJaRheGeh5z5uvqz3FLHgPKVXJzdiVgCTnUeUQFYzcJRFhNtc2gv+ECZGji7HUicj1/6h85Y07DBRl1x2MGqlHWXUawD;hash|6cqYBa15ZK407m4VhFfJLw=="
- for line in html_body.split(b'\n'):
+ for line in html_body.split(b"\n"):
if b"data-pdf-url=" in line:
- line = line.decode('utf-8')
- url = line.strip().replace('data-pdf-url=', '').replace('"', '')
- if url.startswith('http') and 'pdfs.journals.lww.com' in url:
- return dict(pdf_url=url, technique='journals.lww.com-jsvar')
+ line = line.decode("utf-8")
+ url = line.strip().replace("data-pdf-url=", "").replace('"', "")
+ if url.startswith("http") and "pdfs.journals.lww.com" in url:
+ return dict(pdf_url=url, technique="journals.lww.com-jsvar")
# www.ahajournals.org
# https://www.ahajournals.org/doi/10.1161/circ.110.19.2977
- if "://www.ahajournals.org/doi/" in html_url and '/doi/pdf/' not in html_url:
+ if "://www.ahajournals.org/doi/" in html_url and "/doi/pdf/" not in html_url:
# <a href="/doi/pdf/10.1161/circ.110.19.2977?download=true">PDF download</a>
- if b'/doi/pdf/10.' in html_body:
- url = html_url.replace('/doi/10.', '/doi/pdf/10.')
+ if b"/doi/pdf/10." in html_body:
+ url = html_url.replace("/doi/10.", "/doi/pdf/10.")
url = url + "?download=true"
- return dict(pdf_url=url, technique='ahajournals-url')
+ return dict(pdf_url=url, technique="ahajournals-url")
# ehp.niehs.nih.gov
# https://ehp.niehs.nih.gov/doi/full/10.1289/EHP4709
# https://ehp.niehs.nih.gov/doi/10.1289/ehp.113-a51
if "://ehp.niehs.nih.gov/doi/" in html_url:
# <a href="/doi/pdf/10.1289/EHP4709" target="_blank">
- if b'/doi/pdf/10.' in html_body:
- url = html_url.replace('/doi/full/10.',
- '/doi/pdf/10.').replace('/doi/10.', '/doi/pdf/10.')
- return dict(pdf_url=url, technique='ehp.niehs.nigh.gov-url')
+ if b"/doi/pdf/10." in html_body:
+ url = html_url.replace("/doi/full/10.", "/doi/pdf/10.").replace(
+ "/doi/10.", "/doi/pdf/10."
+ )
+ return dict(pdf_url=url, technique="ehp.niehs.nigh.gov-url")
# cogentoa.com
# https://www.cogentoa.com/article/10.1080/23311975.2017.1412873
@@ -263,75 +274,75 @@ def extract_fulltext_url(html_url: str, html_body: bytes) -> Dict[str, str]:
# blech, it's a SPA! All JS
# https://www.cogentoa.com/article/10.1080/23311975.2017.1412873.pdf
url = html_url + ".pdf"
- return dict(pdf_url=url, technique='cogentoa-url')
+ return dict(pdf_url=url, technique="cogentoa-url")
# chemrxiv.org (likely to be other figshare domains also)
# https://chemrxiv.org/articles/Biradical_Formation_by_Deprotonation_in_Thiazole-Derivatives_The_Hidden_Nature_of_Dasatinib/10101419
- if "://chemrxiv.org/articles/" in html_url or '.figshare.org/articles/' in html_url:
+ if "://chemrxiv.org/articles/" in html_url or ".figshare.org/articles/" in html_url:
# <script id="app-data" type="text/json"> [...] </script>
- json_tag = soup.find('script', id="app-data", attrs={"type": "text/json"})
+ json_tag = soup.find("script", id="app-data", attrs={"type": "text/json"})
if json_tag and json_tag.string:
app_data = json.loads(json_tag.string)
# "exportPdfDownloadUrl": "https://s3-eu-west-1.amazonaws.com/itempdf74155353254prod/10101419/Biradical_Formation_by_Deprotonation_in_Thiazole-Derivatives__The_Hidden_Nature_of_Dasatinib_v1.pdf"
- url = app_data.get('article', {}).get('exportPdfDownloadUrl')
- if url and url.startswith('http'):
- return dict(pdf_url=url, technique='figshare-json')
+ url = app_data.get("article", {}).get("exportPdfDownloadUrl")
+ if url and url.startswith("http"):
+ return dict(pdf_url=url, technique="figshare-json")
# CNKI COVID-19 landing pages
# http://en.gzbd.cnki.net/gzbt/detail/detail.aspx?FileName=HBGF202002003&DbName=GZBJ7920&DbCode=GZBJ
- if '://en.gzbd.cnki.net/KCMS/detail/detail.aspx' in html_url:
+ if "://en.gzbd.cnki.net/KCMS/detail/detail.aspx" in html_url:
# <a onclick="WriteKrsDownLog()" target="_blank" id="pdfDown" name="pdfDown" href="/gzbt/download.aspx?filename=4Q1ZYpFdKFUZ6FDR1QkRrolayRXV2ZzattyQ3QFa2JXTyZXUSV3QRFkbndzaGV2KyJXWZVEbFdVYnZndD9EOxg1Tj5Eeys2SMFzLZ5kcuFkM3dEbsR2ZjxEaShVdJhFdp90KhlVVzcjVVlXUVNHWBtWS5Rlb5cnc&amp;tablename=GZBJLAST2020&amp;dflag=pdfdown&#xA; "><i></i>PDF Download</a>
- href = soup.find('a', attrs={"id": "pdfDown"})
+ href = soup.find("a", attrs={"id": "pdfDown"})
if href:
- url = href['href'].strip().replace('&#xA;', '')
- if not url.startswith('http'):
+ url = href["href"].strip().replace("&#xA;", "")
+ if not url.startswith("http"):
url = host_prefix + url
- return dict(pdf_url=url, technique='cnki-href')
+ return dict(pdf_url=url, technique="cnki-href")
# RWTH AACHEN repository
- if '://publications.rwth-aachen.de/record/' in html_url:
- record_id = html_url.split('/')[-1]
+ if "://publications.rwth-aachen.de/record/" in html_url:
+ record_id = html_url.split("/")[-1]
url = f"{html_url}/files/{record_id}.pdf"
- if record_id.isdigit() and url.encode('utf-8') in html_body:
- return dict(pdf_url=url, technique='rwth-aachen-url')
+ if record_id.isdigit() and url.encode("utf-8") in html_body:
+ return dict(pdf_url=url, technique="rwth-aachen-url")
# physchemaspects.ru
- if '://physchemaspects.ru/' in html_url and soup:
- for href in soup.find_all('a'):
+ if "://physchemaspects.ru/" in html_url and soup:
+ for href in soup.find_all("a"):
if href.text == "download PDF file":
- url = href['href']
- if url.startswith('/'):
+ url = href["href"]
+ if url.startswith("/"):
url = host_prefix + url
- return dict(pdf_url=url, technique='physchemaspects-href')
+ return dict(pdf_url=url, technique="physchemaspects-href")
# OJS 3 (some)
if meta_generator and meta_generator.startswith("Open Journal Systems"):
- href = soup.find('a', attrs={"class": "obj_galley_link file"})
+ href = soup.find("a", attrs={"class": "obj_galley_link file"})
if href and href.text and "pdf" in href.text.lower():
- url = href['href'].strip()
- if url.startswith('/'):
+ url = href["href"].strip()
+ if url.startswith("/"):
url = host_prefix + url
- return dict(pdf_url=url, technique='ojs-galley-href')
+ return dict(pdf_url=url, technique="ojs-galley-href")
# ETH zurich e-periodica
- if '://www.e-periodica.ch/digbib/view' in html_url:
- url = html_url.replace('digbib/view', 'cntmng').split('#')[0]
- if url.encode('utf-8') in html_body:
- return dict(pdf_url=url, technique='href-eperiodica')
+ if "://www.e-periodica.ch/digbib/view" in html_url:
+ url = html_url.replace("digbib/view", "cntmng").split("#")[0]
+ if url.encode("utf-8") in html_body:
+ return dict(pdf_url=url, technique="href-eperiodica")
# JMIR
# https://mhealth.jmir.org/2020/7/e17891/
- if '.jmir.org/' in html_url and "/pdf" not in html_url and html_url.endswith("/"):
+ if ".jmir.org/" in html_url and "/pdf" not in html_url and html_url.endswith("/"):
url = html_url + "pdf"
- return dict(pdf_url=url, technique='jmir-url')
+ return dict(pdf_url=url, technique="jmir-url")
### below here we are doing guesses
# generic guess: try current URL plus .pdf, if it exists in the HTML body
- if '.pdf' not in html_url:
+ if ".pdf" not in html_url:
url = html_url + ".pdf"
- if url.encode('utf-8') in html_body:
- return dict(pdf_url=url, technique='guess-url-plus-pdf')
+ if url.encode("utf-8") in html_body:
+ return dict(pdf_url=url, technique="guess-url-plus-pdf")
return dict()
@@ -343,8 +354,10 @@ def test_regex() -> None:
asdf"""
m = OVID_JOURNAL_URL_REGEX.search(lines)
assert m
- assert m.group(
- 1) == "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689"
+ assert (
+ m.group(1)
+ == "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689"
+ )
lines = """
window.onload = function () {
diff --git a/python/sandcrawler/html_metadata.py b/python/sandcrawler/html_metadata.py
index e2e673f..1ab667c 100644
--- a/python/sandcrawler/html_metadata.py
+++ b/python/sandcrawler/html_metadata.py
@@ -30,7 +30,9 @@ HEAD_META_PATTERNS: Dict[str, List[str]] = {
"meta[name='dcterms.title']",
"meta[name='dc.title']",
],
- "subtitle": ["meta[name='prism.subtitle']", ],
+ "subtitle": [
+ "meta[name='prism.subtitle']",
+ ],
"doi": [
"meta[name='citation_doi']",
"meta[name='DOI']",
@@ -40,7 +42,9 @@ HEAD_META_PATTERNS: Dict[str, List[str]] = {
"meta[name='dc.identifier.doi']",
"meta[name='dc.identifier'][scheme='doi']",
],
- "pmid": ["meta[name='citation_pmid']", ],
+ "pmid": [
+ "meta[name='citation_pmid']",
+ ],
"abstract": [
"meta[name='citation_abstract']",
"meta[name='bepress_citation_abstract']",
@@ -61,7 +65,9 @@ HEAD_META_PATTERNS: Dict[str, List[str]] = {
"meta[name='dc.source']",
"meta[property='og:site_name']",
],
- "container_abbrev": ["meta[name='citation_journal_abbrev']", ],
+ "container_abbrev": [
+ "meta[name='citation_journal_abbrev']",
+ ],
"raw_date": [
"meta[name='citation_publication_date']",
"meta[name='bepress_citation_publication_date']",
@@ -162,7 +168,9 @@ HEAD_META_LIST_PATTERNS: Dict[str, List[str]] = {
"meta[name='dc.contributor']",
],
# TODO: citation_author_institution
- "raw_references": ["meta[name='citation_reference']", ],
+ "raw_references": [
+ "meta[name='citation_reference']",
+ ],
"raw_identifiers": [
"meta[name='eprints.id_number']",
"meta[name='dcterms.identifier']",
@@ -646,8 +654,9 @@ class BiblioMetadata(pydantic.BaseModel):
json_encoders = {datetime.date: lambda dt: dt.isoformat()}
-def html_extract_fulltext_url(doc_url: str, doc: HTMLParser,
- patterns: List[dict]) -> Optional[Tuple[str, str]]:
+def html_extract_fulltext_url(
+ doc_url: str, doc: HTMLParser, patterns: List[dict]
+) -> Optional[Tuple[str, str]]:
"""
Tries to quickly extract fulltext URLs using a set of patterns. This
function is intendend to be generic across various extraction techniques.
@@ -656,36 +665,36 @@ def html_extract_fulltext_url(doc_url: str, doc: HTMLParser,
"""
self_doc_url: Optional[Tuple[str, str]] = None
for pattern in patterns:
- if 'selector' not in pattern:
+ if "selector" not in pattern:
continue
- if 'in_doc_url' in pattern:
- if pattern['in_doc_url'] not in doc_url:
+ if "in_doc_url" in pattern:
+ if pattern["in_doc_url"] not in doc_url:
continue
- elem = doc.css_first(pattern['selector'])
+ elem = doc.css_first(pattern["selector"])
if not elem:
continue
val = None
- if 'attr' in pattern:
- val = elem.attrs.get(pattern['attr'])
- elif pattern.get('use_body'):
+ if "attr" in pattern:
+ val = elem.attrs.get(pattern["attr"])
+ elif pattern.get("use_body"):
val = elem.text()
- if '://' not in val:
+ if "://" not in val:
continue
if not val:
continue
val = urllib.parse.urljoin(doc_url, val)
assert val
- if 'in_fulltext_url' in pattern:
- if pattern['in_fulltext_url'] not in val:
+ if "in_fulltext_url" in pattern:
+ if pattern["in_fulltext_url"] not in val:
continue
for skip_pattern in FULLTEXT_URL_PATTERNS_SKIP:
if skip_pattern in val.lower():
continue
if url_fuzzy_equal(doc_url, val):
# don't link to self, unless no other options
- self_doc_url = (val, pattern.get('technique', 'unknown'))
+ self_doc_url = (val, pattern.get("technique", "unknown"))
continue
- return (val, pattern.get('technique', 'unknown'))
+ return (val, pattern.get("technique", "unknown"))
if self_doc_url:
print(" WARN: returning fulltext URL pointing to self", file=sys.stderr)
return self_doc_url
@@ -703,9 +712,9 @@ def html_extract_biblio(doc_url: str, doc: HTMLParser) -> Optional[BiblioMetadat
for field, patterns in HEAD_META_PATTERNS.items():
for pattern in patterns:
val = head.css_first(pattern)
- #print((field, pattern, val))
- if val and 'content' in val.attrs and val.attrs['content']:
- meta[field] = val.attrs['content']
+ # print((field, pattern, val))
+ if val and "content" in val.attrs and val.attrs["content"]:
+ meta[field] = val.attrs["content"]
break
for field, patterns in HEAD_META_LIST_PATTERNS.items():
@@ -713,53 +722,53 @@ def html_extract_biblio(doc_url: str, doc: HTMLParser) -> Optional[BiblioMetadat
val_list = head.css(pattern)
if val_list:
for val in val_list:
- if 'content' in val.attrs and val.attrs['content']:
+ if "content" in val.attrs and val.attrs["content"]:
if field not in meta:
meta[field] = []
- meta[field].append(val.attrs['content'])
+ meta[field].append(val.attrs["content"])
break
# (some) fulltext extractions
pdf_fulltext_url = html_extract_fulltext_url(doc_url, doc, PDF_FULLTEXT_PATTERNS)
if pdf_fulltext_url:
- meta['pdf_fulltext_url'] = pdf_fulltext_url[0]
+ meta["pdf_fulltext_url"] = pdf_fulltext_url[0]
xml_fulltext_url = html_extract_fulltext_url(doc_url, doc, XML_FULLTEXT_PATTERNS)
if xml_fulltext_url:
- meta['xml_fulltext_url'] = xml_fulltext_url[0]
+ meta["xml_fulltext_url"] = xml_fulltext_url[0]
html_fulltext_url = html_extract_fulltext_url(doc_url, doc, HTML_FULLTEXT_PATTERNS)
if html_fulltext_url:
- meta['html_fulltext_url'] = html_fulltext_url[0]
+ meta["html_fulltext_url"] = html_fulltext_url[0]
component_url = html_extract_fulltext_url(doc_url, doc, COMPONENT_FULLTEXT_PATTERNS)
if component_url:
- meta['component_url'] = component_url[0]
+ meta["component_url"] = component_url[0]
# TODO: replace with clean_doi() et al
- if meta.get('doi') and meta.get('doi').startswith('doi:'):
- meta['doi'] = meta['doi'][4:]
+ if meta.get("doi") and meta.get("doi").startswith("doi:"):
+ meta["doi"] = meta["doi"][4:]
- raw_identifiers = meta.pop('raw_identifiers', [])
+ raw_identifiers = meta.pop("raw_identifiers", [])
for ident in raw_identifiers:
- if ident.startswith('doi:10.'):
- if 'doi' not in meta:
- meta['doi'] = ident.replace('doi:', '')
- elif ident.startswith('10.') and '/' in ident:
- if 'doi' not in meta:
- meta['doi'] = ident
- elif ident.startswith('isbn:'):
- if 'isbn' not in meta:
- meta['isbn'] = ident.replace('isbn:', '')
-
- raw_date = meta.pop('raw_date', None)
+ if ident.startswith("doi:10."):
+ if "doi" not in meta:
+ meta["doi"] = ident.replace("doi:", "")
+ elif ident.startswith("10.") and "/" in ident:
+ if "doi" not in meta:
+ meta["doi"] = ident
+ elif ident.startswith("isbn:"):
+ if "isbn" not in meta:
+ meta["isbn"] = ident.replace("isbn:", "")
+
+ raw_date = meta.pop("raw_date", None)
if raw_date:
parsed = dateparser.parse(raw_date)
if parsed:
- meta['release_date'] = parsed.date()
+ meta["release_date"] = parsed.date()
- raw_release_type = meta.pop('raw_release_type', None)
+ raw_release_type = meta.pop("raw_release_type", None)
if raw_release_type:
release_type = RELEASE_TYPE_MAP.get(raw_release_type.lower().strip())
if release_type:
- meta['release_type'] = release_type
+ meta["release_type"] = release_type
return BiblioMetadata(**meta)
@@ -786,29 +795,26 @@ def load_adblock_rules() -> braveblock.Adblocker:
"||pbs.twimg.com^",
"||badge.dimensions.ai^",
"||recaptcha.net^",
-
# not sure about these CC badges (usually via a redirect)
- #"||licensebuttons.net^",
- #"||i.creativecommons.org^",
-
+ # "||licensebuttons.net^",
+ # "||i.creativecommons.org^",
# Should we skip jquery, or other generic javascript CDNs?
- #"||code.jquery.com^",
- #"||ajax.googleapis.com^",
- #"||cdnjs.cloudflare.com^",
-
+ # "||code.jquery.com^",
+ # "||ajax.googleapis.com^",
+ # "||cdnjs.cloudflare.com^",
# badges, "share" buttons, tracking, etc
"apis.google.com/js/plusone",
"www.google.com/recaptcha/",
"js/_getUACode.js"
-
# PLOS images
"/resource/img/icon.*.16.png^",
],
)
-def _extract_generic(doc: HTMLParser, selector: str, attrs: List[str],
- type_name: str) -> List[Dict[str, str]]:
+def _extract_generic(
+ doc: HTMLParser, selector: str, attrs: List[str], type_name: str
+) -> List[Dict[str, str]]:
resources = []
for node in doc.css(selector):
@@ -818,21 +824,22 @@ def _extract_generic(doc: HTMLParser, selector: str, attrs: List[str],
url = node.attrs.get(attr)
# special-case a couple meta URI prefixes which don't match with adblock rules
skip = False
- for prefix in ['about:', 'data:', 'magnet:', 'urn:', 'mailto:']:
+ for prefix in ["about:", "data:", "magnet:", "urn:", "mailto:"]:
if url and url.startswith(prefix):
skip = True
break
if skip:
continue
if url:
- #print(url, file=sys.stderr)
+ # print(url, file=sys.stderr)
resources.append(dict(url=url.strip(), type=type_name))
return resources
-def html_extract_resources(doc_url: str, doc: HTMLParser,
- adblock: braveblock.Adblocker) -> List[Dict[str, str]]:
+def html_extract_resources(
+ doc_url: str, doc: HTMLParser, adblock: braveblock.Adblocker
+) -> List[Dict[str, str]]:
"""
This function tries to find all the important resources in a page. The
presumption is that the HTML document is article fulltext, and we want the
@@ -860,12 +867,14 @@ def html_extract_resources(doc_url: str, doc: HTMLParser,
# ensure URLs are absolute
for r in resources:
- r['url'] = urllib.parse.urljoin(doc_url, r['url'])
+ r["url"] = urllib.parse.urljoin(doc_url, r["url"])
# filter using adblocker
resources = [
- r for r in resources if adblock.check_network_urls(
- r['url'], source_url=doc_url, request_type=r['type']) is False
+ r
+ for r in resources
+ if adblock.check_network_urls(r["url"], source_url=doc_url, request_type=r["type"])
+ is False
]
# remove duplicates
diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py
index 8f28d42..99a7f36 100644
--- a/python/sandcrawler/ia.py
+++ b/python/sandcrawler/ia.py
@@ -34,50 +34,63 @@ class SandcrawlerBackoffError(Exception):
be passed up through any timeout/retry code and become an actual long pause
or crash.
"""
+
pass
-ResourceResult = namedtuple("ResourceResult", [
- "start_url",
- "hit",
- "status",
- "terminal_url",
- "terminal_dt",
- "terminal_status_code",
- "body",
- "cdx",
- "revisit_cdx",
-])
-
-WarcResource = namedtuple("WarcResource", [
- "status_code",
- "location",
- "body",
- "revisit_cdx",
-])
-
-CdxRow = namedtuple('CdxRow', [
- 'surt',
- 'datetime',
- 'url',
- 'mimetype',
- 'status_code',
- 'sha1b32',
- 'sha1hex',
- 'warc_csize',
- 'warc_offset',
- 'warc_path',
-])
-
-CdxPartial = namedtuple('CdxPartial', [
- 'surt',
- 'datetime',
- 'url',
- 'mimetype',
- 'status_code',
- 'sha1b32',
- 'sha1hex',
-])
+ResourceResult = namedtuple(
+ "ResourceResult",
+ [
+ "start_url",
+ "hit",
+ "status",
+ "terminal_url",
+ "terminal_dt",
+ "terminal_status_code",
+ "body",
+ "cdx",
+ "revisit_cdx",
+ ],
+)
+
+WarcResource = namedtuple(
+ "WarcResource",
+ [
+ "status_code",
+ "location",
+ "body",
+ "revisit_cdx",
+ ],
+)
+
+CdxRow = namedtuple(
+ "CdxRow",
+ [
+ "surt",
+ "datetime",
+ "url",
+ "mimetype",
+ "status_code",
+ "sha1b32",
+ "sha1hex",
+ "warc_csize",
+ "warc_offset",
+ "warc_path",
+ ],
+)
+
+CdxPartial = namedtuple(
+ "CdxPartial",
+ [
+ "surt",
+ "datetime",
+ "url",
+ "mimetype",
+ "status_code",
+ "sha1b32",
+ "sha1hex",
+ ],
+)
def cdx_partial_from_row(row: Union[CdxRow, CdxPartial]) -> CdxPartial:
@@ -102,10 +115,10 @@ def cdx_to_dict(cdx: Union[CdxRow, CdxPartial]) -> Dict[str, Any]:
"sha1b32": cdx.sha1b32,
"sha1hex": cdx.sha1hex,
}
- if type(cdx) == CdxRow and '/' in cdx.warc_path:
- d['warc_csize'] = cdx.warc_csize
- d['warc_offset'] = cdx.warc_offset
- d['warc_path'] = cdx.warc_path
+ if type(cdx) == CdxRow and "/" in cdx.warc_path:
+ d["warc_csize"] = cdx.warc_csize
+ d["warc_offset"] = cdx.warc_offset
+ d["warc_path"] = cdx.warc_path
return d
@@ -116,9 +129,9 @@ def fuzzy_match_url(left: str, right: str) -> bool:
"""
if left == right:
return True
- if '://' in left and '://' in right:
- left = '://'.join(left.split('://')[1:])
- right = '://'.join(right.split('://')[1:])
+ if "://" in left and "://" in right:
+ left = "://".join(left.split("://")[1:])
+ right = "://".join(right.split("://")[1:])
if left == right:
return True
if left == right + "/" or right == left + "/":
@@ -149,14 +162,17 @@ class CdxApiClient:
def __init__(self, host_url: str = "https://web.archive.org/cdx/search/cdx", **kwargs):
self.host_url = host_url
self.http_session = requests_retry_session(retries=3, backoff_factor=3)
- cdx_auth_token = kwargs.get('cdx_auth_token', os.environ.get('CDX_AUTH_TOKEN'))
+ cdx_auth_token = kwargs.get("cdx_auth_token", os.environ.get("CDX_AUTH_TOKEN"))
if not cdx_auth_token:
raise Exception(
- "CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)")
- self.http_session.headers.update({
- 'User-Agent': 'Mozilla/5.0 sandcrawler.CdxApiClient',
- 'Cookie': 'cdx_auth_token={}'.format(cdx_auth_token),
- })
+ "CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)"
+ )
+ self.http_session.headers.update(
+ {
+ "User-Agent": "Mozilla/5.0 sandcrawler.CdxApiClient",
+ "Cookie": "cdx_auth_token={}".format(cdx_auth_token),
+ }
+ )
def _query_api(self, params: Dict[str, str]) -> Optional[List[CdxRow]]:
"""
@@ -165,7 +181,7 @@ class CdxApiClient:
resp = self.http_session.get(self.host_url, params=params)
if resp.status_code != 200:
raise CdxApiError(resp.text)
- #print(resp.url, file=sys.stderr)
+ # print(resp.url, file=sys.stderr)
if not resp.text:
return None
rj = resp.json()
@@ -187,7 +203,7 @@ class CdxApiClient:
status_code = int(raw[4])
# CDX rows with no WARC records?
- if raw[8] == '-' or raw[9] == '-' or raw[10] == '-':
+ if raw[8] == "-" or raw[9] == "-" or raw[10] == "-":
continue
row = CdxRow(
@@ -206,28 +222,31 @@ class CdxApiClient:
rows.append(row)
return rows
- def fetch(self,
- url: str,
- datetime: str,
- filter_status_code: Optional[int] = None,
- retry_sleep: Optional[int] = None) -> CdxRow:
+ def fetch(
+ self,
+ url: str,
+ datetime: str,
+ filter_status_code: Optional[int] = None,
+ retry_sleep: Optional[int] = None,
+ ) -> CdxRow:
"""
Fetches a single CDX row by url/datetime. Raises a KeyError if not
found, because we expect to be looking up a specific full record.
"""
if len(datetime) != 14:
raise ValueError(
- "CDX fetch requires full 14 digit timestamp. Got: {}".format(datetime))
+ "CDX fetch requires full 14 digit timestamp. Got: {}".format(datetime)
+ )
params: Dict[str, str] = {
- 'url': url,
- 'from': datetime,
- 'to': datetime,
- 'matchType': 'exact',
- 'limit': "1",
- 'output': 'json',
+ "url": url,
+ "from": datetime,
+ "to": datetime,
+ "matchType": "exact",
+ "limit": "1",
+ "output": "json",
}
if filter_status_code:
- params['filter'] = "statuscode:{}".format(filter_status_code)
+ params["filter"] = "statuscode:{}".format(filter_status_code)
resp = self._query_api(params)
if not resp:
if retry_sleep and retry_sleep > 0:
@@ -235,37 +254,43 @@ class CdxApiClient:
if retry_sleep > 3:
next_sleep = retry_sleep - 3
retry_sleep = 3
- print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep),
- file=sys.stderr)
+ print(
+ " CDX fetch failed; will sleep {}sec and try again".format(retry_sleep),
+ file=sys.stderr,
+ )
time.sleep(retry_sleep)
- return self.fetch(url,
- datetime,
- filter_status_code=filter_status_code,
- retry_sleep=next_sleep)
+ return self.fetch(
+ url, datetime, filter_status_code=filter_status_code, retry_sleep=next_sleep
+ )
raise KeyError("CDX url/datetime not found: {} {}".format(url, datetime))
row = resp[0]
# allow fuzzy http/https match
if not (fuzzy_match_url(row.url, url) and row.datetime == datetime):
if retry_sleep and retry_sleep > 0:
- print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep),
- file=sys.stderr)
+ print(
+ " CDX fetch failed; will sleep {}sec and try again".format(retry_sleep),
+ file=sys.stderr,
+ )
time.sleep(retry_sleep)
- return self.fetch(url,
- datetime,
- filter_status_code=filter_status_code,
- retry_sleep=None)
+ return self.fetch(
+ url, datetime, filter_status_code=filter_status_code, retry_sleep=None
+ )
raise KeyError(
"Didn't get exact CDX url/datetime match. url:{} dt:{} got:{}".format(
- url, datetime, row))
+ url, datetime, row
+ )
+ )
if filter_status_code:
assert row.status_code == filter_status_code
return row
- def lookup_best(self,
- url: str,
- max_age_days: Optional[int] = None,
- best_mimetype: Optional[str] = None,
- closest: Union[datetime.datetime, str, None] = None) -> Optional[CdxRow]:
+ def lookup_best(
+ self,
+ url: str,
+ max_age_days: Optional[int] = None,
+ best_mimetype: Optional[str] = None,
+ closest: Union[datetime.datetime, str, None] = None,
+ ) -> Optional[CdxRow]:
"""
Fetches multiple CDX rows for the given URL, tries to find the most recent.
@@ -289,27 +314,26 @@ class CdxApiClient:
"""
params: Dict[str, str] = {
- 'url': url,
- 'matchType': 'exact',
- 'limit': "-25",
- 'output': 'json',
+ "url": url,
+ "matchType": "exact",
+ "limit": "-25",
+ "output": "json",
# Collapsing seems efficient, but is complex; would need to include
# other filters and status code in filter
#'collapse': 'timestamp:6',
-
# Revisits now allowed and resolved!
#'filter': '!mimetype:warc/revisit',
}
if max_age_days:
since = datetime.date.today() - datetime.timedelta(days=max_age_days)
- params['from'] = '%04d%02d%02d' % (since.year, since.month, since.day)
+ params["from"] = "%04d%02d%02d" % (since.year, since.month, since.day)
if closest:
if isinstance(closest, datetime.datetime):
- params['closest'] = '%04d%02d%02d' % (closest.year, closest.month, closest.day)
+ params["closest"] = "%04d%02d%02d" % (closest.year, closest.month, closest.day)
else:
- params['closest'] = closest
- params['sort'] = "closest"
- #print(params, file=sys.stderr)
+ params["closest"] = closest
+ params["sort"] = "closest"
+ # print(params, file=sys.stderr)
rows = self._query_api(params)
if not rows:
return None
@@ -326,7 +350,7 @@ class CdxApiClient:
int(r.mimetype == best_mimetype),
int(r.mimetype != "warc/revisit"),
int(r.datetime[:6]),
- int('/' in r.warc_path),
+ int("/" in r.warc_path),
int(r.datetime),
)
@@ -358,25 +382,23 @@ class WaybackClient:
self.cdx_client = CdxApiClient()
# /serve/ instead of /download/ doesn't record view count
# this *does* want to be http://, not https://
- self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/')
+ self.petabox_base_url = kwargs.get("petabox_base_url", "http://archive.org/serve/")
# gwb library will fall back to reading from /opt/.petabox/webdata.secret
self.petabox_webdata_secret = kwargs.get(
- 'petabox_webdata_secret',
- os.environ.get('PETABOX_WEBDATA_SECRET'),
+ "petabox_webdata_secret",
+ os.environ.get("PETABOX_WEBDATA_SECRET"),
)
- self.warc_uri_prefix = kwargs.get('warc_uri_prefix', 'https://archive.org/serve/')
+ self.warc_uri_prefix = kwargs.get("warc_uri_prefix", "https://archive.org/serve/")
self.rstore = None
self.max_redirects = 25
self.wayback_endpoint = "https://web.archive.org/web/"
self.replay_headers = {
- 'User-Agent': 'Mozilla/5.0 sandcrawler.WaybackClient',
+ "User-Agent": "Mozilla/5.0 sandcrawler.WaybackClient",
}
- def fetch_petabox(self,
- csize: int,
- offset: int,
- warc_path: str,
- resolve_revisit: bool = True) -> WarcResource:
+ def fetch_petabox(
+ self, csize: int, offset: int, warc_path: str, resolve_revisit: bool = True
+ ) -> WarcResource:
"""
Fetches wayback resource directly from petabox using WARC path/offset/csize.
@@ -401,37 +423,49 @@ class WaybackClient:
raise Exception("WaybackClient needs petabox secret to do direct WARC fetches")
if "/" not in warc_path:
raise ValueError(
- "what looks like a liveweb/SPN temporary warc path: {}".format(warc_path))
+ "what looks like a liveweb/SPN temporary warc path: {}".format(warc_path)
+ )
warc_uri = self.warc_uri_prefix + warc_path
if not self.rstore:
self.rstore = ResourceStore(
- loaderfactory=CDXLoaderFactory3(webdata_secret=self.petabox_webdata_secret, ))
+ loaderfactory=CDXLoaderFactory3(
+ webdata_secret=self.petabox_webdata_secret,
+ )
+ )
assert self.rstore
try:
- #print("offset: {} csize: {} uri: {}".format(offset, csize, warc_uri), file=sys.stderr)
+ # print("offset: {} csize: {} uri: {}".format(offset, csize, warc_uri), file=sys.stderr)
gwb_record = self.rstore.load_resource(warc_uri, offset, csize)
except wayback.exception.ResourceUnavailable:
print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr)
raise PetaboxError(
- "failed to load file contents from wayback/petabox (ResourceUnavailable)")
+ "failed to load file contents from wayback/petabox (ResourceUnavailable)"
+ )
except wayback.exception.InvalidResource:
print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr)
raise WaybackContentError(
- "failed to load file contents from wayback/petabox (InvalidResource)")
+ "failed to load file contents from wayback/petabox (InvalidResource)"
+ )
except urllib3.exceptions.ReadTimeoutError as rte:
raise PetaboxError(
- "failed to load file contents from wayback/petabox (ReadTimeoutError: {})".
- format(rte))
+ "failed to load file contents from wayback/petabox (ReadTimeoutError: {})".format(
+ rte
+ )
+ )
except ValueError as ve:
raise PetaboxError(
- "failed to load file contents from wayback/petabox (ValueError: {})".format(ve))
+ "failed to load file contents from wayback/petabox (ValueError: {})".format(ve)
+ )
except EOFError as eofe:
raise PetaboxError(
- "failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
+ "failed to load file contents from wayback/petabox (EOFError: {})".format(eofe)
+ )
except TypeError as te:
raise PetaboxError(
- "failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)"
- .format(te))
+ "failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(
+ te
+ )
+ )
except Exception as e:
if "while decompressing data: invalid block type" in str(e):
raise PetaboxError(
@@ -449,8 +483,11 @@ class WaybackClient:
raise WaybackContentError("too many HTTP headers (in wayback fetch)")
location = gwb_record.get_location() or None
- if status_code is None and gwb_record.target_uri.startswith(
- b"ftp://") and not gwb_record.is_revisit():
+ if (
+ status_code is None
+ and gwb_record.target_uri.startswith(b"ftp://")
+ and not gwb_record.is_revisit()
+ ):
# TODO: some additional verification here?
status_code = 226
@@ -463,17 +500,19 @@ class WaybackClient:
if not (revisit_uri and revisit_dt):
raise WaybackContentError(
"revisit record missing URI and/or DT: warc:{} offset:{}".format(
- warc_path, offset))
+ warc_path, offset
+ )
+ )
# convert revisit_dt
# len("2018-07-24T11:56:49"), or with "Z"
assert len(revisit_dt) in (19, 20)
if type(revisit_uri) is bytes:
- revisit_uri = revisit_uri.decode('utf-8')
+ revisit_uri = revisit_uri.decode("utf-8")
if type(revisit_dt) is bytes:
- revisit_dt = revisit_dt.decode('utf-8')
- revisit_dt = revisit_dt.replace('-', '').replace(':',
- '').replace('T',
- '').replace('Z', '')
+ revisit_dt = revisit_dt.decode("utf-8")
+ revisit_dt = (
+ revisit_dt.replace("-", "").replace(":", "").replace("T", "").replace("Z", "")
+ )
assert len(revisit_dt) == 14
try:
revisit_cdx = self.cdx_client.fetch(revisit_uri, revisit_dt)
@@ -491,8 +530,10 @@ class WaybackClient:
body = gwb_record.open_raw_content().read()
except IncompleteRead as ire:
raise WaybackError(
- "failed to read actual file contents from wayback/petabox (IncompleteRead: {})"
- .format(ire))
+ "failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(
+ ire
+ )
+ )
elif status_code is None:
raise WaybackContentError("got a None status_code in (W)ARC record")
return WarcResource(
@@ -502,12 +543,14 @@ class WaybackClient:
revisit_cdx=revisit_cdx,
)
- def fetch_petabox_body(self,
- csize: int,
- offset: int,
- warc_path: str,
- resolve_revisit: bool = True,
- expected_status_code: Optional[int] = None) -> bytes:
+ def fetch_petabox_body(
+ self,
+ csize: int,
+ offset: int,
+ warc_path: str,
+ resolve_revisit: bool = True,
+ expected_status_code: Optional[int] = None,
+ ) -> bytes:
"""
Fetches HTTP 200 WARC resource directly from petabox using WARC path/offset/csize.
@@ -524,20 +567,22 @@ class WaybackClient:
if expected_status_code:
if expected_status_code != resource.status_code:
- raise KeyError("archived HTTP response (WARC) was not {}: {}".format(
- expected_status_code,
- resource.status_code,
- ))
+ raise KeyError(
+ "archived HTTP response (WARC) was not {}: {}".format(
+ expected_status_code,
+ resource.status_code,
+ )
+ )
elif resource.status_code not in (200, 226):
- raise KeyError("archived HTTP response (WARC) was not 200: {}".format(
- resource.status_code))
+ raise KeyError(
+ "archived HTTP response (WARC) was not 200: {}".format(resource.status_code)
+ )
return resource.body
- def fetch_replay_body(self,
- url: str,
- datetime: str,
- cdx_sha1hex: Optional[str] = None) -> bytes:
+ def fetch_replay_body(
+ self, url: str, datetime: str, cdx_sha1hex: Optional[str] = None
+ ) -> bytes:
"""
Fetches an HTTP 200 record from wayback via the replay interface
(web.archive.org) instead of petabox.
@@ -570,32 +615,42 @@ class WaybackClient:
except UnicodeDecodeError:
raise WaybackContentError(
"UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(
- url))
+ url
+ )
+ )
try:
resp.raise_for_status()
except Exception as e:
raise WaybackError(str(e))
- #print(resp.url, file=sys.stderr)
+ # print(resp.url, file=sys.stderr)
# defensively check that this is actually correct replay based on headers
if "X-Archive-Src" not in resp.headers:
raise WaybackError("replay fetch didn't return X-Archive-Src in headers")
if datetime not in resp.url:
- raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(
- datetime, resp.url))
+ raise WaybackError(
+ "didn't get exact reply (redirect?) datetime:{} got:{}".format(
+ datetime, resp.url
+ )
+ )
if cdx_sha1hex:
# verify that body matches CDX hash
# TODO: don't need *all* these hashes, just sha1
file_meta = gen_file_metadata(resp.content)
- if cdx_sha1hex != file_meta['sha1hex']:
- print(" REPLAY MISMATCH: cdx:{} replay:{}".format(cdx_sha1hex,
- file_meta['sha1hex']),
- file=sys.stderr)
+ if cdx_sha1hex != file_meta["sha1hex"]:
+ print(
+ " REPLAY MISMATCH: cdx:{} replay:{}".format(
+ cdx_sha1hex, file_meta["sha1hex"]
+ ),
+ file=sys.stderr,
+ )
raise WaybackContentError(
"replay fetch body didn't match CDX hash cdx:{} body:{}".format(
- cdx_sha1hex, file_meta['sha1hex']), )
+ cdx_sha1hex, file_meta["sha1hex"]
+ ),
+ )
return resp.content
def fetch_replay_redirect(self, url: str, datetime: str) -> Optional[str]:
@@ -625,37 +680,44 @@ class WaybackClient:
except UnicodeDecodeError:
raise WaybackContentError(
"UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(
- url))
+ url
+ )
+ )
try:
resp.raise_for_status()
except Exception as e:
raise WaybackError(str(e))
- #print(resp.url, file=sys.stderr)
+ # print(resp.url, file=sys.stderr)
# defensively check that this is actually correct replay based on headers
# previously check for "X-Archive-Redirect-Reason" here
if "X-Archive-Src" not in resp.headers:
raise WaybackError("redirect replay fetch didn't return X-Archive-Src in headers")
if datetime not in resp.url:
- raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(
- datetime, resp.url))
+ raise WaybackError(
+ "didn't get exact reply (redirect?) datetime:{} got:{}".format(
+ datetime, resp.url
+ )
+ )
redirect_url = resp.headers.get("Location")
# eg, https://web.archive.org/web/20200111003923id_/https://dx.doi.org/10.17504/protocols.io.y2gfybw
- #print(redirect_url, file=sys.stderr)
+ # print(redirect_url, file=sys.stderr)
if redirect_url and redirect_url.startswith("https://web.archive.org/web/"):
redirect_url = "/".join(redirect_url.split("/")[5:])
- #print(redirect_url, file=sys.stderr)
+ # print(redirect_url, file=sys.stderr)
if redirect_url and redirect_url.startswith("http"):
redirect_url = clean_url(redirect_url)
return redirect_url
else:
return None
- def lookup_resource(self,
- start_url: str,
- best_mimetype: Optional[str] = None,
- closest: Union[str, datetime.datetime, None] = None) -> ResourceResult:
+ def lookup_resource(
+ self,
+ start_url: str,
+ best_mimetype: Optional[str] = None,
+ closest: Union[str, datetime.datetime, None] = None,
+ ) -> ResourceResult:
"""
Looks in wayback for a resource starting at the URL, following any
redirects. Returns a ResourceResult object, which may indicate a
@@ -684,8 +746,9 @@ class WaybackClient:
for i in range(self.max_redirects + 1):
print(" URL: {}".format(next_url), file=sys.stderr)
next_row: Optional[CdxRow] = self.cdx_client.lookup_best(
- next_url, best_mimetype=best_mimetype, closest=closest)
- #print(next_row, file=sys.stderr)
+ next_url, best_mimetype=best_mimetype, closest=closest
+ )
+ # print(next_row, file=sys.stderr)
if not next_row:
return ResourceResult(
start_url=start_url,
@@ -702,7 +765,7 @@ class WaybackClient:
cdx_row: CdxRow = next_row
# first try straight-forward redirect situation
- if cdx_row.mimetype == "warc/revisit" and '/' in cdx_row.warc_path:
+ if cdx_row.mimetype == "warc/revisit" and "/" in cdx_row.warc_path:
resource = self.fetch_petabox(
csize=cdx_row.warc_csize,
offset=cdx_row.warc_offset,
@@ -725,7 +788,7 @@ class WaybackClient:
if cdx_row.status_code in (200, 226):
revisit_cdx = None
final_cdx: Union[CdxRow, CdxPartial] = cdx_row
- if '/' in cdx_row.warc_path:
+ if "/" in cdx_row.warc_path:
resource = self.fetch_petabox(
csize=cdx_row.warc_csize,
offset=cdx_row.warc_offset,
@@ -751,7 +814,7 @@ class WaybackClient:
revisit_cdx=revisit_cdx,
)
elif 300 <= (cdx_row.status_code or 0) < 400:
- if '/' in cdx_row.warc_path:
+ if "/" in cdx_row.warc_path:
resource = self.fetch_petabox(
csize=cdx_row.warc_csize,
offset=cdx_row.warc_offset,
@@ -848,34 +911,39 @@ class SavePageNowBackoffError(SandcrawlerBackoffError):
pass
-SavePageNowResult = namedtuple('SavePageNowResult', [
- 'success',
- 'status',
- 'job_id',
- 'request_url',
- 'terminal_url',
- 'terminal_dt',
- 'resources',
-])
+SavePageNowResult = namedtuple(
+ "SavePageNowResult",
+ [
+ "success",
+ "status",
+ "job_id",
+ "request_url",
+ "terminal_url",
+ "terminal_dt",
+ "resources",
+ ],
+)
class SavePageNowClient:
def __init__(self, v2endpoint: str = "https://web.archive.org/save", **kwargs):
- self.ia_access_key = kwargs.get('ia_access_key', os.environ.get('IA_ACCESS_KEY'))
- self.ia_secret_key = kwargs.get('ia_secret_key', os.environ.get('IA_SECRET_KEY'))
+ self.ia_access_key = kwargs.get("ia_access_key", os.environ.get("IA_ACCESS_KEY"))
+ self.ia_secret_key = kwargs.get("ia_secret_key", os.environ.get("IA_SECRET_KEY"))
self.v2endpoint = v2endpoint
self.v2_session = requests_retry_session(retries=5, backoff_factor=3)
- self.v2_session.headers.update({
- 'User-Agent': 'Mozilla/5.0 sandcrawler.SavePageNowClient',
- 'Accept': 'application/json',
- 'Authorization': 'LOW {}:{}'.format(self.ia_access_key, self.ia_secret_key),
- })
+ self.v2_session.headers.update(
+ {
+ "User-Agent": "Mozilla/5.0 sandcrawler.SavePageNowClient",
+ "Accept": "application/json",
+ "Authorization": "LOW {}:{}".format(self.ia_access_key, self.ia_secret_key),
+ }
+ )
# 3 minutes total
self.poll_count = 60
self.poll_seconds = 3.0
- self.spn_cdx_retry_sec = kwargs.get('spn_cdx_retry_sec', 9.0)
+ self.spn_cdx_retry_sec = kwargs.get("spn_cdx_retry_sec", 9.0)
# these are special-case web domains for which we want SPN2 to not run
# a headless browser (brozzler), but instead simply run wget.
@@ -888,20 +956,20 @@ class SavePageNowClient:
"://europepmc.org/backend/ptpmcrender.fcgi",
"://pdfs.semanticscholar.org/",
"://res.mdpi.com/",
-
# platform sites
"://zenodo.org/",
"://figshare.org/",
"://springernature.figshare.com/",
-
# popular simple cloud storage or direct links
"://s3-eu-west-1.amazonaws.com/",
]
- def save_url_now_v2(self,
- request_url: str,
- force_simple_get: Optional[int] = None,
- capture_outlinks: int = 0) -> SavePageNowResult:
+ def save_url_now_v2(
+ self,
+ request_url: str,
+ force_simple_get: Optional[int] = None,
+ capture_outlinks: int = 0,
+ ) -> SavePageNowResult:
"""
Returns a "SavePageNowResult" (namedtuple) if SPN request was processed
at all, or raises an exception if there was an error with SPN itself.
@@ -944,33 +1012,39 @@ class SavePageNowClient:
resp = self.v2_session.post(
self.v2endpoint,
data={
- 'url': request_url,
- 'capture_all': 1,
- 'capture_outlinks': capture_outlinks,
- 'capture_screenshot': 0,
- 'if_not_archived_within': '1d',
- 'force_get': force_simple_get,
- 'skip_first_archive': 1,
- 'outlinks_availability': 0,
- 'js_behavior_timeout': 0,
+ "url": request_url,
+ "capture_all": 1,
+ "capture_outlinks": capture_outlinks,
+ "capture_screenshot": 0,
+ "if_not_archived_within": "1d",
+ "force_get": force_simple_get,
+ "skip_first_archive": 1,
+ "outlinks_availability": 0,
+ "js_behavior_timeout": 0,
},
)
if resp.status_code == 429:
- raise SavePageNowBackoffError("status_code: {}, url: {}".format(
- resp.status_code, request_url))
+ raise SavePageNowBackoffError(
+ "status_code: {}, url: {}".format(resp.status_code, request_url)
+ )
elif resp.status_code != 200:
- raise SavePageNowError("SPN2 status_code: {}, url: {}".format(
- resp.status_code, request_url))
+ raise SavePageNowError(
+ "SPN2 status_code: {}, url: {}".format(resp.status_code, request_url)
+ )
resp_json = resp.json()
- if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json[
- 'message']:
- raise SavePageNowBackoffError(resp_json['message'])
- elif not resp_json or 'job_id' not in resp_json or not resp_json['job_id']:
+ if (
+ resp_json
+ and "message" in resp_json
+ and "You have already reached the limit of active sessions" in resp_json["message"]
+ ):
+ raise SavePageNowBackoffError(resp_json["message"])
+ elif not resp_json or "job_id" not in resp_json or not resp_json["job_id"]:
raise SavePageNowError(
- "Didn't get expected 'job_id' field in SPN2 response: {}".format(resp_json))
+ "Didn't get expected 'job_id' field in SPN2 response: {}".format(resp_json)
+ )
- job_id = resp_json['job_id']
+ job_id = resp_json["job_id"]
print(f" SPNv2 running: job_id={job_id} url={request_url}", file=sys.stderr)
# poll until complete
@@ -981,53 +1055,59 @@ class SavePageNowClient:
resp.raise_for_status()
except Exception:
raise SavePageNowError(resp.content)
- status = resp.json()['status']
- if status == 'pending':
+ status = resp.json()["status"]
+ if status == "pending":
time.sleep(self.poll_seconds)
- elif status in ('success', 'error'):
+ elif status in ("success", "error"):
final_json = resp.json()
break
else:
- raise SavePageNowError("Unknown SPN2 status:{} url:{}".format(
- status, request_url))
+ raise SavePageNowError(
+ "Unknown SPN2 status:{} url:{}".format(status, request_url)
+ )
if not final_json:
raise SavePageNowError("SPN2 timed out (polling count exceeded)")
# if there was a recent crawl of same URL, fetch the status of that
# crawl to get correct datetime
- if final_json.get('original_job_id'):
- print(f" SPN recent capture: {job_id} -> {final_json['original_job_id']}",
- file=sys.stderr)
- resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint,
- final_json['original_job_id']))
+ if final_json.get("original_job_id"):
+ print(
+ f" SPN recent capture: {job_id} -> {final_json['original_job_id']}",
+ file=sys.stderr,
+ )
+ resp = self.v2_session.get(
+ "{}/status/{}".format(self.v2endpoint, final_json["original_job_id"])
+ )
try:
resp.raise_for_status()
except Exception:
raise SavePageNowError(resp.content)
final_json = resp.json()
- #print(final_json, file=sys.stderr)
+ # print(final_json, file=sys.stderr)
- if final_json['status'] == "success":
- if final_json.get('original_url').startswith('/'):
- print(f" truncateded URL in JSON: {request_url} {json.dumps(final_json)}",
- file=sys.stderr)
+ if final_json["status"] == "success":
+ if final_json.get("original_url").startswith("/"):
+ print(
+ f" truncateded URL in JSON: {request_url} {json.dumps(final_json)}",
+ file=sys.stderr,
+ )
return SavePageNowResult(
True,
"success",
job_id,
request_url,
- final_json['original_url'],
- final_json['timestamp'],
- final_json['resources'],
+ final_json["original_url"],
+ final_json["timestamp"],
+ final_json["resources"],
)
else:
- if final_json['status'] == 'pending':
- final_json['status'] = 'error:pending'
+ if final_json["status"] == "pending":
+ final_json["status"] = "error:pending"
return SavePageNowResult(
False,
- final_json.get('status_ext') or final_json['status'],
+ final_json.get("status_ext") or final_json["status"],
job_id,
request_url,
None,
@@ -1035,10 +1115,12 @@ class SavePageNowClient:
None,
)
- def crawl_resource(self,
- start_url: str,
- wayback_client: WaybackClient,
- force_simple_get: Optional[int] = None) -> ResourceResult:
+ def crawl_resource(
+ self,
+ start_url: str,
+ wayback_client: WaybackClient,
+ force_simple_get: Optional[int] = None,
+ ) -> ResourceResult:
"""
Runs a SPN2 crawl, then fetches body.
@@ -1048,18 +1130,23 @@ class SavePageNowClient:
"""
# HACK: capture CNKI domains with outlinks (for COVID-19 crawling)
- if 'gzbd.cnki.net/' in start_url:
- spn_result = self.save_url_now_v2(start_url,
- force_simple_get=force_simple_get,
- capture_outlinks=1)
+ if "gzbd.cnki.net/" in start_url:
+ spn_result = self.save_url_now_v2(
+ start_url, force_simple_get=force_simple_get, capture_outlinks=1
+ )
else:
spn_result = self.save_url_now_v2(start_url, force_simple_get=force_simple_get)
if not spn_result.success:
status = spn_result.status
- if status in ("error:invalid-url", "error:not-found",
- "error:invalid-host-resolution", "error:gateway-timeout",
- "error:too-many-redirects", "error:read-timeout"):
+ if status in (
+ "error:invalid-url",
+ "error:not-found",
+ "error:invalid-host-resolution",
+ "error:gateway-timeout",
+ "error:too-many-redirects",
+ "error:read-timeout",
+ ):
status = status.replace("error:", "")
elif status in ("error:no-access", "error:forbidden"):
status = "forbidden"
@@ -1070,8 +1157,10 @@ class SavePageNowClient:
elif status.startswith("error:"):
status = "spn2-" + status
# despite other errors, call these a failure (so we don't retry)
- if spn_result.terminal_url and (spn_result.terminal_url.endswith('/cookieAbsent')
- or spn_result.terminal_url.endswith("cookieSet=1")):
+ if spn_result.terminal_url and (
+ spn_result.terminal_url.endswith("/cookieAbsent")
+ or spn_result.terminal_url.endswith("cookieSet=1")
+ ):
status = "blocked-cookie"
return ResourceResult(
start_url=start_url,
@@ -1084,10 +1173,10 @@ class SavePageNowClient:
cdx=None,
revisit_cdx=None,
)
- #print(spn_result, file=sys.stderr)
+ # print(spn_result, file=sys.stderr)
# detect partial URL response (aka, success, but missing full URL)
- if "://" not in spn_result.terminal_url or spn_result.terminal_url.startswith('/'):
+ if "://" not in spn_result.terminal_url or spn_result.terminal_url.startswith("/"):
return ResourceResult(
start_url=start_url,
hit=False,
@@ -1102,7 +1191,8 @@ class SavePageNowClient:
# don't try to CDX fetch for this common cookie block terminal
if spn_result.terminal_url.endswith(
- '/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1"):
+ "/cookieAbsent"
+ ) or spn_result.terminal_url.endswith("cookieSet=1"):
return ResourceResult(
start_url=start_url,
hit=False,
@@ -1127,7 +1217,7 @@ class SavePageNowClient:
cdx_row = elsevier_pdf_cdx
else:
print(" Failed pdf.sciencedirectassets.com hack!", file=sys.stderr)
- #print(elsevier_pdf_cdx, file=sys.stderr)
+ # print(elsevier_pdf_cdx, file=sys.stderr)
if not cdx_row:
# lookup exact
@@ -1164,11 +1254,11 @@ class SavePageNowClient:
revisit_cdx=None,
)
- #print(cdx_row, file=sys.stderr)
+ # print(cdx_row, file=sys.stderr)
revisit_cdx = None
final_cdx: Union[CdxRow, CdxPartial] = cdx_row
- if '/' in cdx_row.warc_path:
+ if "/" in cdx_row.warc_path:
# Usually can't do this kind of direct fetch because CDX result is recent/live
resource = wayback_client.fetch_petabox(
csize=cdx_row.warc_csize,
@@ -1228,12 +1318,19 @@ class SavePageNowClient:
)
-def fix_transfer_encoding(file_meta: dict,
- resource: ResourceResult) -> Tuple[dict, ResourceResult]:
- if resource.body and file_meta[
- 'mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip':
- print(" transfer encoding not stripped: {}".format(resource.cdx.mimetype),
- file=sys.stderr)
+def fix_transfer_encoding(
+ file_meta: dict, resource: ResourceResult
+) -> Tuple[dict, ResourceResult]:
+ if (
+ resource.body
+ and file_meta["mimetype"] == "application/gzip"
+ and resource.cdx
+ and resource.cdx.mimetype != "application/gzip"
+ ):
+ print(
+ " transfer encoding not stripped: {}".format(resource.cdx.mimetype),
+ file=sys.stderr,
+ )
inner_body = gzip.decompress(resource.body)
if not inner_body:
raise Exception("null body inside transfer encoding")
diff --git a/python/sandcrawler/ingest_file.py b/python/sandcrawler/ingest_file.py
index 49c7ddf..4a5abbe 100644
--- a/python/sandcrawler/ingest_file.py
+++ b/python/sandcrawler/ingest_file.py
@@ -10,15 +10,32 @@ from selectolax.parser import HTMLParser
from sandcrawler.db import SandcrawlerPostgrestClient
from sandcrawler.grobid import GrobidClient
from sandcrawler.html import extract_fulltext_url
-from sandcrawler.html_metadata import (html_extract_biblio, html_extract_resources,
- load_adblock_rules)
-from sandcrawler.ia import (CdxApiError, NoCaptureError, PetaboxError, ResourceResult,
- SavePageNowClient, SavePageNowError, WaybackClient,
- WaybackContentError, WaybackError, cdx_to_dict,
- fix_transfer_encoding)
-from sandcrawler.ingest_html import (WebResource, fetch_html_resources,
- html_extract_body_teixml, html_guess_platform,
- html_guess_scope, quick_fetch_html_resources)
+from sandcrawler.html_metadata import (
+ html_extract_biblio,
+ html_extract_resources,
+ load_adblock_rules,
+)
+from sandcrawler.ia import (
+ CdxApiError,
+ NoCaptureError,
+ PetaboxError,
+ ResourceResult,
+ SavePageNowClient,
+ SavePageNowError,
+ WaybackClient,
+ WaybackContentError,
+ WaybackError,
+ cdx_to_dict,
+ fix_transfer_encoding,
+)
+from sandcrawler.ingest_html import (
+ WebResource,
+ fetch_html_resources,
+ html_extract_body_teixml,
+ html_guess_platform,
+ html_guess_scope,
+ quick_fetch_html_resources,
+)
from sandcrawler.misc import clean_url, gen_file_metadata, parse_cdx_datetime
from sandcrawler.pdfextract import PdfExtractResult, process_pdf
from sandcrawler.workers import SandcrawlerWorker
@@ -53,74 +70,71 @@ class IngestFileWorker(SandcrawlerWorker):
process_file_hit(ResourceResult) -> response
process_grobid(ResourceResult)
"""
+
def __init__(self, sink: Optional[SandcrawlerWorker] = None, **kwargs):
super().__init__()
self.sink = sink
- if kwargs.get('wayback_client'):
- self.wayback_client: WaybackClient = kwargs['wayback_client']
+ if kwargs.get("wayback_client"):
+ self.wayback_client: WaybackClient = kwargs["wayback_client"]
else:
self.wayback_client = WaybackClient()
- if kwargs.get('spn_client'):
- self.spn_client: SavePageNowClient = kwargs['spn_client']
+ if kwargs.get("spn_client"):
+ self.spn_client: SavePageNowClient = kwargs["spn_client"]
else:
self.spn_client = SavePageNowClient(
- spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0))
+ spn_cdx_retry_sec=kwargs.get("spn_cdx_retry_sec", 9.0)
+ )
- if kwargs.get('grobid_client'):
- self.grobid_client: GrobidClient = kwargs['grobid_client']
+ if kwargs.get("grobid_client"):
+ self.grobid_client: GrobidClient = kwargs["grobid_client"]
else:
self.grobid_client = GrobidClient()
- if kwargs.get('pgrest_client'):
- self.pgrest_client: SandcrawlerPostgrestClient = kwargs['pgrest_client']
+ if kwargs.get("pgrest_client"):
+ self.pgrest_client: SandcrawlerPostgrestClient = kwargs["pgrest_client"]
else:
self.pgrest_client = SandcrawlerPostgrestClient()
- self.grobid_sink = kwargs.get('grobid_sink')
- self.thumbnail_sink = kwargs.get('thumbnail_sink')
- self.pdftext_sink = kwargs.get('pdftext_sink')
- self.xmldoc_sink = kwargs.get('xmldoc_sink')
- self.htmlteixml_sink = kwargs.get('htmlteixml_sink')
+ self.grobid_sink = kwargs.get("grobid_sink")
+ self.thumbnail_sink = kwargs.get("thumbnail_sink")
+ self.pdftext_sink = kwargs.get("pdftext_sink")
+ self.xmldoc_sink = kwargs.get("xmldoc_sink")
+ self.htmlteixml_sink = kwargs.get("htmlteixml_sink")
self.max_hops = 6
- self.try_existing_ingest = kwargs.get('try_existing_ingest', False)
- self.try_existing_grobid = kwargs.get('try_existing_grobid', True)
- self.try_existing_pdfextract = kwargs.get('try_existing_pdfextract', True)
- self.try_wayback = kwargs.get('try_wayback', True)
- self.try_spn2 = kwargs.get('try_spn2', True)
- self.html_quick_mode = kwargs.get('html_quick_mode', False)
+ self.try_existing_ingest = kwargs.get("try_existing_ingest", False)
+ self.try_existing_grobid = kwargs.get("try_existing_grobid", True)
+ self.try_existing_pdfextract = kwargs.get("try_existing_pdfextract", True)
+ self.try_wayback = kwargs.get("try_wayback", True)
+ self.try_spn2 = kwargs.get("try_spn2", True)
+ self.html_quick_mode = kwargs.get("html_quick_mode", False)
self.adblock_rules = load_adblock_rules()
self.max_html_resources = 200
self.base_url_blocklist = [
# robot blocking
"://hkvalidate.perfdrive.com/",
-
# temporary, until we implement specific fetch and 'petabox' output
"://archive.org/",
"://www.archive.org/",
"://web.archive.org/web/",
-
# out of scope
"://openlibrary.org/",
"://www.openlibrary.org/",
"://fatcat.wiki/",
"://orcid.org/",
"://doaj.org/",
-
# Domain squats
"://bartandjones.com",
"://ijretm.com",
"://ijrcemas.com",
"://jist.net.in",
"://croisements-revue.org",
-
# all stubs/previews, not full papers
"://page-one.live.cf.public.springer.com",
-
# large datasets-only (no PDF expected)
"plutof.ut.ee/",
"www.gbif.org/",
@@ -129,16 +143,13 @@ class IngestFileWorker(SandcrawlerWorker):
"://doi.org/10.25642/ipk/gbis/",
"://apex.ipk-gatersleben.de/",
"fao.org/glis/",
-
# Historical non-paper content:
"dhz.uni-passau.de/", # newspapers
"digital.ucd.ie/", # ireland national historical
-
# DOI prefixes
"doi.org/10.2307/", # JSTOR; slow and many redirects
"doi.org/10.18730/", # fao.org: database entry
"doi.org/10.15468/", # gbif.org: database entry
-
# deprecated domain (doesn't redirect correctly)
"://edoc.mpg.de/",
]
@@ -216,15 +227,14 @@ class IngestFileWorker(SandcrawlerWorker):
return None
existing = self.pgrest_client.get_ingest_file_result(ingest_type, base_url)
# TODO: filter on more flags?
- if existing and existing['hit'] is True:
+ if existing and existing["hit"] is True:
return existing
else:
return None
- def find_resource(self,
- url: str,
- best_mimetype: Optional[str] = None,
- force_recrawl: bool = False) -> Optional[ResourceResult]:
+ def find_resource(
+ self, url: str, best_mimetype: Optional[str] = None, force_recrawl: bool = False
+ ) -> Optional[ResourceResult]:
"""
Looks in wayback for a resource starting at the URL, following any
redirects. If a hit isn't found, try crawling with SPN.
@@ -233,7 +243,8 @@ class IngestFileWorker(SandcrawlerWorker):
resource = None
if url.startswith("http://web.archive.org/web/") or url.startswith(
- "https://web.archive.org/web/"):
+ "https://web.archive.org/web/"
+ ):
raise NotImplementedError("handling direct wayback links not supported yet")
if url.startswith("http://archive.org/") or url.startswith("https://archive.org/"):
@@ -247,20 +258,32 @@ class IngestFileWorker(SandcrawlerWorker):
soft404 = False
# NOTE: these are often not working with SPNv2 either, so disabling. If
# we really want to try again, should do force-recrawl
- #if resource and resource.hit and resource.terminal_url.endswith('/cookieAbsent'):
+ # if resource and resource.hit and resource.terminal_url.endswith('/cookieAbsent'):
# soft404 = True
old_failure = False
- if resource and not resource.hit and resource.terminal_dt and resource.terminal_dt < '20190000000000':
+ if (
+ resource
+ and not resource.hit
+ and resource.terminal_dt
+ and resource.terminal_dt < "20190000000000"
+ ):
old_failure = True
- if self.try_spn2 and (resource is None or (resource and resource.status == 'no-capture')
- or soft404 or old_failure):
+ if self.try_spn2 and (
+ resource is None
+ or (resource and resource.status == "no-capture")
+ or soft404
+ or old_failure
+ ):
via = "spn2"
resource = self.spn_client.crawl_resource(url, self.wayback_client)
- print("[FETCH {:>6}] {} {}".format(via, (resource and resource.status),
- (resource and resource.terminal_url) or url),
- file=sys.stderr)
+ print(
+ "[FETCH {:>6}] {} {}".format(
+ via, (resource and resource.status), (resource and resource.terminal_url) or url
+ ),
+ file=sys.stderr,
+ )
return resource
def process_existing(self, request: dict, result_row: dict) -> dict:
@@ -269,51 +292,55 @@ class IngestFileWorker(SandcrawlerWorker):
additional processing necessary to return a result.
"""
raise NotImplementedError("process_existing() not tested or safe yet")
- assert result_row['hit']
- existing_file_meta = self.pgrest_client.get_file_meta(result_row['terminal_sha1hex'])
- existing_grobid = self.pgrest_client.get_grobid(result_row['terminal_sha1hex'])
- existing_cdx = self.pgrest_client.get_cdx(result_row['terminal_url'],
- result_row['terminal_dt'])
+ assert result_row["hit"]
+ existing_file_meta = self.pgrest_client.get_file_meta(result_row["terminal_sha1hex"])
+ existing_grobid = self.pgrest_client.get_grobid(result_row["terminal_sha1hex"])
+ existing_cdx = self.pgrest_client.get_cdx(
+ result_row["terminal_url"], result_row["terminal_dt"]
+ )
if not (existing_file_meta and existing_grobid and existing_cdx):
raise NotImplementedError("partially-exsiting records not implemented yet")
result = {
- 'hit': result_row['hit'],
- 'status': "existing",
- 'request': request,
- 'grobid': existing_grobid,
- 'file_meta': existing_file_meta,
- 'cdx': existing_cdx,
- 'terminal': {
- 'terminal_url': result_row['terminal_url'],
- 'terminal_dt': result_row['terminal_dt'],
- 'terminal_status_code': result_row['terminal_status_code'],
- 'terminal_sha1hex': result_row['terminal_sha1hex'],
+ "hit": result_row["hit"],
+ "status": "existing",
+ "request": request,
+ "grobid": existing_grobid,
+ "file_meta": existing_file_meta,
+ "cdx": existing_cdx,
+ "terminal": {
+ "terminal_url": result_row["terminal_url"],
+ "terminal_dt": result_row["terminal_dt"],
+ "terminal_status_code": result_row["terminal_status_code"],
+ "terminal_sha1hex": result_row["terminal_sha1hex"],
},
}
return result
- def process_file_hit(self, ingest_type: str, resource: ResourceResult,
- file_meta: dict) -> dict:
+ def process_file_hit(
+ self, ingest_type: str, resource: ResourceResult, file_meta: dict
+ ) -> dict:
"""
Run all the necessary processing for a new/fresh ingest hit.
"""
- if ingest_type in ["dataset-file", "component"
- ] and file_meta['mimetype'] == "application/pdf":
+ if (
+ ingest_type in ["dataset-file", "component"]
+ and file_meta["mimetype"] == "application/pdf"
+ ):
ingest_type = "pdf"
if ingest_type == "pdf":
return {
- 'grobid': self.process_grobid(resource, file_meta),
- 'pdf_meta': self.process_pdfextract(resource, file_meta),
+ "grobid": self.process_grobid(resource, file_meta),
+ "pdf_meta": self.process_pdfextract(resource, file_meta),
}
elif ingest_type == "xml":
return {
- 'xml_meta': self.process_xml(resource, file_meta),
+ "xml_meta": self.process_xml(resource, file_meta),
}
elif ingest_type == "html":
html_info = self.process_html(resource, file_meta)
# if there is no html_biblio, don't clobber anything possibly extracted earlier
- if 'html_biblio' in html_info and not html_info['html_biblio']:
- html_info.pop('html_biblio')
+ if "html_biblio" in html_info and not html_info["html_biblio"]:
+ html_info.pop("html_biblio")
return html_info
elif ingest_type == "src":
return {}
@@ -332,7 +359,7 @@ class IngestFileWorker(SandcrawlerWorker):
decide if we should re-process
"""
if self.try_existing_grobid:
- existing = self.pgrest_client.get_grobid(file_meta['sha1hex'])
+ existing = self.pgrest_client.get_grobid(file_meta["sha1hex"])
if existing:
print("found existing GROBID result", file=sys.stderr)
return existing
@@ -341,18 +368,18 @@ class IngestFileWorker(SandcrawlerWorker):
result = self.grobid_client.process_fulltext(resource.body)
if self.grobid_sink:
# extra fields for GROBID kafka messages
- result['file_meta'] = file_meta
- result['key'] = result['file_meta']['sha1hex']
+ result["file_meta"] = file_meta
+ result["key"] = result["file_meta"]["sha1hex"]
self.grobid_sink.push_record(result.copy())
- if result['status'] == "success":
+ if result["status"] == "success":
metadata = self.grobid_client.metadata(result)
if metadata:
- result['metadata'] = self.grobid_client.metadata(result)
- result['fatcat_release'] = result['metadata'].pop('fatcat_release', None)
- result['grobid_version'] = result['metadata'].pop('grobid_version', None)
- result.pop('tei_xml', None)
- result.pop('file_meta', None)
- result.pop('key', None)
+ result["metadata"] = self.grobid_client.metadata(result)
+ result["fatcat_release"] = result["metadata"].pop("fatcat_release", None)
+ result["grobid_version"] = result["metadata"].pop("grobid_version", None)
+ result.pop("tei_xml", None)
+ result.pop("file_meta", None)
+ result.pop("key", None)
return result
def process_pdfextract(self, resource: ResourceResult, file_meta: dict) -> dict:
@@ -365,7 +392,7 @@ class IngestFileWorker(SandcrawlerWorker):
TODO: difference between Kafka schema and SQL/postgrest schema
"""
if self.try_existing_pdfextract:
- existing = self.pgrest_client.get_pdf_meta(file_meta['sha1hex'])
+ existing = self.pgrest_client.get_pdf_meta(file_meta["sha1hex"])
if existing:
print("found existing pdf_meta result", file=sys.stderr)
result = PdfExtractResult.from_pdf_meta_dict(existing)
@@ -373,9 +400,9 @@ class IngestFileWorker(SandcrawlerWorker):
# Need to actually processes
result = process_pdf(resource.body)
- assert result.sha1hex == file_meta['sha1hex']
+ assert result.sha1hex == file_meta["sha1hex"]
assert result.file_meta is not None
- assert result.file_meta['sha1hex'] == file_meta['sha1hex']
+ assert result.file_meta["sha1hex"] == file_meta["sha1hex"]
if self.thumbnail_sink and result.page0_thumbnail is not None:
self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex)
if self.pdftext_sink:
@@ -392,7 +419,7 @@ class IngestFileWorker(SandcrawlerWorker):
In the future, could extract other metadata here (like body word
count), or attempting to fetch sub-resources.
"""
- if self.xmldoc_sink and file_meta['mimetype'] == "application/jats+xml":
+ if self.xmldoc_sink and file_meta["mimetype"] == "application/jats+xml":
try:
jats_xml = xml_reserialize(resource.body)
except xml.etree.ElementTree.ParseError:
@@ -402,7 +429,7 @@ class IngestFileWorker(SandcrawlerWorker):
status="success",
jats_xml=jats_xml,
)
- self.xmldoc_sink.push_record(msg, key=file_meta['sha1hex'])
+ self.xmldoc_sink.push_record(msg, key=file_meta["sha1hex"])
return dict(status="success")
def process_html(self, resource: ResourceResult, file_meta: dict) -> dict:
@@ -416,11 +443,12 @@ class IngestFileWorker(SandcrawlerWorker):
assert html_biblio
html_body = html_extract_body_teixml(resource.body)
html_platform = html_guess_platform(resource.terminal_url, html_doc, html_biblio)
- html_scope = html_guess_scope(resource.terminal_url, html_doc, html_biblio,
- html_body.get('word_count'))
+ html_scope = html_guess_scope(
+ resource.terminal_url, html_doc, html_biblio, html_body.get("word_count")
+ )
html_biblio_dict = json.loads(html_biblio.json(exclude_none=True))
- if html_scope in ('blocked-captcha', 'blocked-cookie', 'blocked-forbidden'):
+ if html_scope in ("blocked-captcha", "blocked-cookie", "blocked-forbidden"):
return dict(
status=html_scope,
html_biblio=html_biblio_dict,
@@ -428,8 +456,8 @@ class IngestFileWorker(SandcrawlerWorker):
platform=html_platform,
)
elif html_scope not in (
- 'article-fulltext',
- 'unknown',
+ "article-fulltext",
+ "unknown",
):
html_body.pop("tei_xml", None)
return dict(
@@ -440,8 +468,9 @@ class IngestFileWorker(SandcrawlerWorker):
html_body=html_body,
)
- raw_resources = html_extract_resources(resource.terminal_url, html_doc,
- self.adblock_rules)
+ raw_resources = html_extract_resources(
+ resource.terminal_url, html_doc, self.adblock_rules
+ )
if len(raw_resources) > self.max_html_resources:
html_body.pop("tei_xml", None)
return dict(
@@ -452,8 +481,8 @@ class IngestFileWorker(SandcrawlerWorker):
html_body=html_body,
)
- if self.htmlteixml_sink and html_body['status'] == "success":
- self.htmlteixml_sink.push_record(html_body, key=file_meta['sha1hex'])
+ if self.htmlteixml_sink and html_body["status"] == "success":
+ self.htmlteixml_sink.push_record(html_body, key=file_meta["sha1hex"])
html_body.pop("tei_xml", None)
@@ -470,30 +499,30 @@ class IngestFileWorker(SandcrawlerWorker):
try:
if self.html_quick_mode:
print(" WARN: running quick CDX-only fetches", file=sys.stderr)
- full_resources = quick_fetch_html_resources(raw_resources,
- self.wayback_client.cdx_client,
- when)
+ full_resources = quick_fetch_html_resources(
+ raw_resources, self.wayback_client.cdx_client, when
+ )
else:
full_resources = fetch_html_resources(raw_resources, self.wayback_client, when)
except PetaboxError as e:
- partial_result['status'] = 'petabox-error'
- partial_result['error_message'] = str(e)[:1600]
+ partial_result["status"] = "petabox-error"
+ partial_result["error_message"] = str(e)[:1600]
return partial_result
except CdxApiError as e:
- partial_result['status'] = 'cdx-error'
- partial_result['error_message'] = str(e)[:1600]
+ partial_result["status"] = "cdx-error"
+ partial_result["error_message"] = str(e)[:1600]
return partial_result
except WaybackError as e:
- partial_result['status'] = 'wayback-error'
- partial_result['error_message'] = str(e)[:1600]
+ partial_result["status"] = "wayback-error"
+ partial_result["error_message"] = str(e)[:1600]
return partial_result
except WaybackContentError as e:
- partial_result['status'] = 'wayback-content-error'
- partial_result['error_message'] = str(e)[:1600]
+ partial_result["status"] = "wayback-content-error"
+ partial_result["error_message"] = str(e)[:1600]
return partial_result
except NoCaptureError as e:
- partial_result['status'] = 'html-resource-no-capture'
- partial_result['error_message'] = str(e)[:1600]
+ partial_result["status"] = "html-resource-no-capture"
+ partial_result["error_message"] = str(e)[:1600]
return partial_result
info = dict(
@@ -503,8 +532,8 @@ class IngestFileWorker(SandcrawlerWorker):
platform=html_platform,
html_resources=[json.loads(r.json(exclude_none=True)) for r in full_resources],
)
- if html_scope == 'unknown':
- info['status'] = 'unknown-scope'
+ if html_scope == "unknown":
+ info["status"] = "unknown-scope"
return info
def timeout_response(self, task: dict) -> dict:
@@ -517,7 +546,7 @@ class IngestFileWorker(SandcrawlerWorker):
)
def want(self, request: dict) -> bool:
- if not request.get('ingest_type') in ('file', 'pdf', 'xml', 'html', 'src', 'component'):
+ if not request.get("ingest_type") in ("file", "pdf", "xml", "html", "src", "component"):
return False
return True
@@ -527,19 +556,19 @@ class IngestFileWorker(SandcrawlerWorker):
def process_file(self, request: dict, key: Any = None) -> dict:
# old backwards compatibility
- if request.get('ingest_type') == 'file':
- request['ingest_type'] = 'pdf'
+ if request.get("ingest_type") == "file":
+ request["ingest_type"] = "pdf"
- ingest_type = request.get('ingest_type')
+ ingest_type = request.get("ingest_type")
if ingest_type not in ("pdf", "xml", "html", "src", "component"):
raise NotImplementedError(f"can't handle ingest_type={ingest_type}")
# parse/clean URL
# note that we pass through the original/raw URL, and that is what gets
# persisted in database table
- base_url = clean_url(request['base_url'])
+ base_url = clean_url(request["base_url"])
- force_recrawl = bool(request.get('force_recrawl', False))
+ force_recrawl = bool(request.get("force_recrawl", False))
for block in self.base_url_blocklist:
if block in base_url:
@@ -569,112 +598,113 @@ class IngestFileWorker(SandcrawlerWorker):
while len(hops) <= self.max_hops:
- result['hops'] = hops
+ result["hops"] = hops
# check against blocklist again on each hop
for block in self.base_url_blocklist:
if block in next_url:
- result['status'] = "skip-url-blocklist"
+ result["status"] = "skip-url-blocklist"
return result
# check against known loginwall URLs
for block in self.wall_blocklist:
if block in next_url:
# TODO: blocked-wall instead of skip-wall
- result['status'] = "skip-wall"
+ result["status"] = "skip-wall"
return result
# check for popular cookie blocking URL patterns. On successful SPN
# crawls, shouldn't see these redirect URLs
for pattern in self.cookie_blocklist:
if pattern in next_url:
- result['status'] = 'blocked-cookie'
+ result["status"] = "blocked-cookie"
return result
try:
- resource = self.find_resource(next_url,
- best_mimetype,
- force_recrawl=force_recrawl)
+ resource = self.find_resource(
+ next_url, best_mimetype, force_recrawl=force_recrawl
+ )
except SavePageNowError as e:
- result['status'] = 'spn2-error'
- result['error_message'] = str(e)[:1600]
+ result["status"] = "spn2-error"
+ result["error_message"] = str(e)[:1600]
return result
except PetaboxError as e:
- result['status'] = 'petabox-error'
- result['error_message'] = str(e)[:1600]
+ result["status"] = "petabox-error"
+ result["error_message"] = str(e)[:1600]
return result
except CdxApiError as e:
- result['status'] = 'cdx-error'
- result['error_message'] = str(e)[:1600]
+ result["status"] = "cdx-error"
+ result["error_message"] = str(e)[:1600]
# add a sleep in cdx-error path as a slow-down
time.sleep(2.0)
return result
except WaybackError as e:
- result['status'] = 'wayback-error'
- result['error_message'] = str(e)[:1600]
+ result["status"] = "wayback-error"
+ result["error_message"] = str(e)[:1600]
return result
except WaybackContentError as e:
- result['status'] = 'wayback-content-error'
- result['error_message'] = str(e)[:1600]
+ result["status"] = "wayback-content-error"
+ result["error_message"] = str(e)[:1600]
return result
except NotImplementedError as e:
- result['status'] = 'not-implemented'
- result['error_message'] = str(e)[:1600]
+ result["status"] = "not-implemented"
+ result["error_message"] = str(e)[:1600]
return result
assert resource
if resource.terminal_url:
- result['terminal'] = {
+ result["terminal"] = {
"terminal_url": resource.terminal_url,
"terminal_dt": resource.terminal_dt,
"terminal_status_code": resource.terminal_status_code,
}
- if resource.terminal_url not in result['hops']:
- result['hops'].append(resource.terminal_url)
+ if resource.terminal_url not in result["hops"]:
+ result["hops"].append(resource.terminal_url)
if not resource.hit:
- result['status'] = resource.status
+ result["status"] = resource.status
return result
if resource.terminal_url:
for pattern in self.base_url_blocklist:
if pattern in resource.terminal_url:
- result['status'] = 'skip-url-blocklist'
+ result["status"] = "skip-url-blocklist"
return result
if resource.terminal_url:
for pattern in self.cookie_blocklist:
if pattern in resource.terminal_url:
- result['status'] = 'blocked-cookie'
+ result["status"] = "blocked-cookie"
return result
if not resource.body:
- result['status'] = 'null-body'
+ result["status"] = "null-body"
return result
if len(resource.body) > MAX_BODY_SIZE_BYTES:
- result['status'] = 'body-too-large'
+ result["status"] = "body-too-large"
return result
file_meta = gen_file_metadata(resource.body)
try:
file_meta, resource = fix_transfer_encoding(file_meta, resource)
except Exception as e:
- result['status'] = 'bad-gzip-encoding'
- result['error_message'] = str(e)
+ result["status"] = "bad-gzip-encoding"
+ result["error_message"] = str(e)
return result
- if not resource.body or file_meta['size_bytes'] == 0:
- result['status'] = 'null-body'
+ if not resource.body or file_meta["size_bytes"] == 0:
+ result["status"] = "null-body"
return result
# here we split based on ingest type to try and extract a next hop
html_ish_resource = bool(
- "html" in file_meta['mimetype']
- or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml"
- or "application/xml" in file_meta['mimetype']
- or "text/xml" in file_meta['mimetype'])
+ "html" in file_meta["mimetype"]
+ or "xhtml" in file_meta["mimetype"] # matches "application/xhtml+xml"
+ or "application/xml" in file_meta["mimetype"]
+ or "text/xml" in file_meta["mimetype"]
+ )
html_biblio = None
html_doc = None
if html_ish_resource and resource.body:
@@ -682,10 +712,11 @@ class IngestFileWorker(SandcrawlerWorker):
html_doc = HTMLParser(resource.body)
html_biblio = html_extract_biblio(resource.terminal_url, html_doc)
if html_biblio:
- if 'html_biblio' not in result and html_biblio.title:
- result['html_biblio'] = json.loads(
- html_biblio.json(exclude_none=True))
- #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr)
+ if "html_biblio" not in result and html_biblio.title:
+ result["html_biblio"] = json.loads(
+ html_biblio.json(exclude_none=True)
+ )
+ # print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr)
except ValueError:
pass
@@ -700,27 +731,32 @@ class IngestFileWorker(SandcrawlerWorker):
else:
fulltext_url = extract_fulltext_url(resource.terminal_url, resource.body)
- result['extract_next_hop'] = fulltext_url
+ result["extract_next_hop"] = fulltext_url
if not fulltext_url:
- result['status'] = 'no-pdf-link'
+ result["status"] = "no-pdf-link"
return result
- next_url = fulltext_url.get('pdf_url') or fulltext_url.get('next_url') or ""
+ next_url = fulltext_url.get("pdf_url") or fulltext_url.get("next_url") or ""
assert next_url
next_url = clean_url(next_url)
- print("[PARSE {:>6}] {} {}".format(
- ingest_type,
- fulltext_url.get('technique'),
- next_url,
- ),
- file=sys.stderr)
+ print(
+ "[PARSE {:>6}] {} {}".format(
+ ingest_type,
+ fulltext_url.get("technique"),
+ next_url,
+ ),
+ file=sys.stderr,
+ )
if next_url in hops:
- result['status'] = 'link-loop'
- result['error_message'] = "repeated: {}".format(next_url)
+ result["status"] = "link-loop"
+ result["error_message"] = "repeated: {}".format(next_url)
return result
hops.append(next_url)
continue
- elif ingest_type in ("xml", "html",
- "component") and html_ish_resource and html_biblio:
+ elif (
+ ingest_type in ("xml", "html", "component")
+ and html_ish_resource
+ and html_biblio
+ ):
# NOTE: src_fulltext_url is not a thing
next_url_found = None
if ingest_type == "xml" and html_biblio.xml_fulltext_url:
@@ -733,18 +769,20 @@ class IngestFileWorker(SandcrawlerWorker):
if next_url_found:
next_url = next_url_found
technique = "html_biblio"
- print("[PARSE {:>6}] {} {}".format(
- ingest_type,
- technique,
- next_url,
- ),
- file=sys.stderr)
+ print(
+ "[PARSE {:>6}] {} {}".format(
+ ingest_type,
+ technique,
+ next_url,
+ ),
+ file=sys.stderr,
+ )
if next_url in hops:
if ingest_type == "html":
# for HTML ingest, we don't count this as a link-loop
break
- result['status'] = 'link-loop'
- result['error_message'] = "repeated: {}".format(next_url)
+ result["status"] = "link-loop"
+ result["error_message"] = "repeated: {}".format(next_url)
return result
hops.append(next_url)
continue
@@ -753,7 +791,7 @@ class IngestFileWorker(SandcrawlerWorker):
break
if len(hops) >= self.max_hops:
- result['status'] = "max-hops-exceeded"
+ result["status"] = "max-hops-exceeded"
return result
# fetch must be a hit if we got this far (though not necessarily an ingest hit!)
@@ -762,38 +800,41 @@ class IngestFileWorker(SandcrawlerWorker):
assert resource.terminal_status_code in (200, 226)
if resource.terminal_url:
- result['terminal'] = {
+ result["terminal"] = {
"terminal_url": resource.terminal_url,
"terminal_dt": resource.terminal_dt,
"terminal_status_code": resource.terminal_status_code,
- "terminal_sha1hex": file_meta['sha1hex'],
+ "terminal_sha1hex": file_meta["sha1hex"],
}
- result['file_meta'] = file_meta
- result['cdx'] = cdx_to_dict(resource.cdx)
+ result["file_meta"] = file_meta
+ result["cdx"] = cdx_to_dict(resource.cdx)
if resource.revisit_cdx:
- result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx)
+ result["revisit_cdx"] = cdx_to_dict(resource.revisit_cdx)
if ingest_type == "pdf":
- if file_meta['mimetype'] != "application/pdf":
- result['status'] = "wrong-mimetype" # formerly: "other-mimetype"
+ if file_meta["mimetype"] != "application/pdf":
+ result["status"] = "wrong-mimetype" # formerly: "other-mimetype"
return result
elif ingest_type == "xml":
- if file_meta['mimetype'] not in ("application/xml", "text/xml",
- "application/jats+xml"):
- result['status'] = "wrong-mimetype"
+ if file_meta["mimetype"] not in (
+ "application/xml",
+ "text/xml",
+ "application/jats+xml",
+ ):
+ result["status"] = "wrong-mimetype"
return result
elif ingest_type == "html":
- if file_meta['mimetype'] not in ("text/html", "application/xhtml+xml"):
- result['status'] = "wrong-mimetype"
+ if file_meta["mimetype"] not in ("text/html", "application/xhtml+xml"):
+ result["status"] = "wrong-mimetype"
return result
elif ingest_type == "src":
- if file_meta['mimetype'] not in self.src_valid_mimetypes:
- result['status'] = "wrong-mimetype"
+ if file_meta["mimetype"] not in self.src_valid_mimetypes:
+ result["status"] = "wrong-mimetype"
return result
elif ingest_type == "component":
- if file_meta['mimetype'] not in self.component_valid_mimetypes:
- result['status'] = "wrong-mimetype"
+ if file_meta["mimetype"] not in self.component_valid_mimetypes:
+ result["status"] = "wrong-mimetype"
return result
else:
raise NotImplementedError()
@@ -802,26 +843,30 @@ class IngestFileWorker(SandcrawlerWorker):
result.update(info)
# check if processing turned up an error
- if info.get('status') not in ('success', None):
- result['status'] = info['status']
+ if info.get("status") not in ("success", None):
+ result["status"] = info["status"]
return result
- result['status'] = "success"
- result['hit'] = True
+ result["status"] = "success"
+ result["hit"] = True
if ingest_type == "pdf":
- print("[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format(
- ingest_type,
- result.get('file_meta', {}).get('sha1hex'),
- result.get('grobid', {}).get('status_code'),
- result.get('pdf_meta', {}).get('status'),
- ),
- file=sys.stderr)
+ print(
+ "[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format(
+ ingest_type,
+ result.get("file_meta", {}).get("sha1hex"),
+ result.get("grobid", {}).get("status_code"),
+ result.get("pdf_meta", {}).get("status"),
+ ),
+ file=sys.stderr,
+ )
else:
- print("[SUCCESS {:>5}] sha1:{}".format(
- ingest_type,
- result.get('file_meta', {}).get('sha1hex'),
- ),
- file=sys.stderr)
+ print(
+ "[SUCCESS {:>5}] sha1:{}".format(
+ ingest_type,
+ result.get("file_meta", {}).get("sha1hex"),
+ ),
+ file=sys.stderr,
+ )
return result
@@ -832,11 +877,11 @@ class IngestFileRequestHandler(BaseHTTPRequestHandler):
self.end_headers()
self.wfile.write(b"404: Not Found")
return
- length = int(self.headers.get('content-length'))
- request = json.loads(self.rfile.read(length).decode('utf-8'))
+ length = int(self.headers.get("content-length"))
+ request = json.loads(self.rfile.read(length).decode("utf-8"))
print("Got request: {}".format(request))
ingester = IngestFileWorker()
result = ingester.process(request)
self.send_response(200)
self.end_headers()
- self.wfile.write(json.dumps(result).encode('utf8'))
+ self.wfile.write(json.dumps(result).encode("utf8"))
diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py
index 5728e24..227f511 100644
--- a/python/sandcrawler/ingest_fileset.py
+++ b/python/sandcrawler/ingest_fileset.py
@@ -6,15 +6,33 @@ from typing import Any, Dict, Optional
import requests
from selectolax.parser import HTMLParser
-from sandcrawler.fileset_platforms import (ArchiveOrgHelper, DataverseHelper, FigshareHelper,
- ZenodoHelper)
-from sandcrawler.fileset_strategies import (ArchiveorgFilesetStrategy, ArchiveorgFileStrategy,
- WebFilesetStrategy, WebFileStrategy)
-from sandcrawler.fileset_types import (IngestStrategy, PlatformRestrictedError,
- PlatformScopeError)
+from sandcrawler.fileset_platforms import (
+ ArchiveOrgHelper,
+ DataverseHelper,
+ FigshareHelper,
+ ZenodoHelper,
+)
+from sandcrawler.fileset_strategies import (
+ ArchiveorgFilesetStrategy,
+ ArchiveorgFileStrategy,
+ WebFilesetStrategy,
+ WebFileStrategy,
+)
+from sandcrawler.fileset_types import (
+ IngestStrategy,
+ PlatformRestrictedError,
+ PlatformScopeError,
+)
from sandcrawler.html_metadata import html_extract_biblio
-from sandcrawler.ia import (CdxApiError, PetaboxError, SavePageNowError, WaybackContentError,
- WaybackError, cdx_to_dict, fix_transfer_encoding)
+from sandcrawler.ia import (
+ CdxApiError,
+ PetaboxError,
+ SavePageNowError,
+ WaybackContentError,
+ WaybackError,
+ cdx_to_dict,
+ fix_transfer_encoding,
+)
from sandcrawler.ingest_file import IngestFileWorker
from sandcrawler.misc import clean_url, gen_file_metadata
from sandcrawler.workers import SandcrawlerWorker
@@ -35,15 +53,16 @@ class IngestFilesetWorker(IngestFileWorker):
checking to see if content has been archived already)
4. summarize status
"""
+
def __init__(self, sink: Optional[SandcrawlerWorker] = None, **kwargs):
super().__init__(sink=None, **kwargs)
self.sink = sink
self.dataset_platform_helpers = {
- 'dataverse': DataverseHelper(),
- 'figshare': FigshareHelper(),
- 'zenodo': ZenodoHelper(),
- 'archiveorg': ArchiveOrgHelper(),
+ "dataverse": DataverseHelper(),
+ "figshare": FigshareHelper(),
+ "zenodo": ZenodoHelper(),
+ "archiveorg": ArchiveOrgHelper(),
}
self.dataset_strategy_archivers = {
IngestStrategy.ArchiveorgFileset: ArchiveorgFilesetStrategy(),
@@ -52,10 +71,10 @@ class IngestFilesetWorker(IngestFileWorker):
IngestStrategy.WebFile: WebFileStrategy(),
}
- self.max_total_size = kwargs.get('max_total_size', 64 * 1024 * 1024 * 1024)
- self.max_file_count = kwargs.get('max_file_count', 200)
- self.ingest_file_result_sink = kwargs.get('ingest_file_result_sink')
- self.ingest_file_result_stdout = kwargs.get('ingest_file_result_stdout', False)
+ self.max_total_size = kwargs.get("max_total_size", 64 * 1024 * 1024 * 1024)
+ self.max_file_count = kwargs.get("max_file_count", 200)
+ self.ingest_file_result_sink = kwargs.get("ingest_file_result_sink")
+ self.ingest_file_result_stdout = kwargs.get("ingest_file_result_stdout", False)
def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]:
"""
@@ -65,7 +84,7 @@ class IngestFilesetWorker(IngestFileWorker):
return None
existing = self.pgrest_client.get_ingest_fileset_platform(ingest_type, base_url)
# TODO: filter on more flags?
- if existing and existing['hit'] is True:
+ if existing and existing["hit"] is True:
return existing
else:
return None
@@ -78,112 +97,114 @@ class IngestFilesetWorker(IngestFileWorker):
raise NotImplementedError("process_existing() not tested or safe yet")
def want(self, request: dict) -> bool:
- if not request.get('ingest_type') in ('dataset', ):
+ if not request.get("ingest_type") in ("dataset",):
return False
return True
- def fetch_resource_iteratively(self, ingest_type: str, base_url: str,
- force_recrawl: bool) -> dict:
+ def fetch_resource_iteratively(
+ self, ingest_type: str, base_url: str, force_recrawl: bool
+ ) -> dict:
"""
This is copypasta from process_file(), should probably refactor.
"""
result: Dict[str, Any] = dict(hit=False)
- result['hops'] = [base_url]
+ result["hops"] = [base_url]
next_url = base_url
# check against blocklist
for block in self.base_url_blocklist:
# NOTE: hack to not skip archive.org content
- if 'archive.org' in block:
+ if "archive.org" in block:
continue
if block in next_url:
- result['status'] = "skip-url-blocklist"
+ result["status"] = "skip-url-blocklist"
return result
try:
resource = self.find_resource(next_url, force_recrawl=force_recrawl)
except SavePageNowError as e:
- result['status'] = 'spn2-error'
- result['error_message'] = str(e)[:1600]
+ result["status"] = "spn2-error"
+ result["error_message"] = str(e)[:1600]
return result
except PetaboxError as e:
- result['status'] = 'petabox-error'
- result['error_message'] = str(e)[:1600]
+ result["status"] = "petabox-error"
+ result["error_message"] = str(e)[:1600]
return result
except CdxApiError as e:
- result['status'] = 'cdx-error'
- result['error_message'] = str(e)[:1600]
+ result["status"] = "cdx-error"
+ result["error_message"] = str(e)[:1600]
# add a sleep in cdx-error path as a slow-down
time.sleep(2.0)
return result
except WaybackError as e:
- result['status'] = 'wayback-error'
- result['error_message'] = str(e)[:1600]
+ result["status"] = "wayback-error"
+ result["error_message"] = str(e)[:1600]
return result
except WaybackContentError as e:
- result['status'] = 'wayback-content-error'
- result['error_message'] = str(e)[:1600]
+ result["status"] = "wayback-content-error"
+ result["error_message"] = str(e)[:1600]
return result
except NotImplementedError:
- #result['status'] = 'not-implemented'
- #result['error_message'] = str(e)[:1600]
- #return result
+ # result['status'] = 'not-implemented'
+ # result['error_message'] = str(e)[:1600]
+ # return result
resource = None
html_biblio = None
if resource:
if resource.terminal_url:
- result['terminal'] = {
+ result["terminal"] = {
"terminal_url": resource.terminal_url,
"terminal_dt": resource.terminal_dt,
"terminal_status_code": resource.terminal_status_code,
}
- if resource.terminal_url not in result['hops']:
- result['hops'].append(resource.terminal_url)
+ if resource.terminal_url not in result["hops"]:
+ result["hops"].append(resource.terminal_url)
if not resource.hit:
- result['status'] = resource.status
+ result["status"] = resource.status
return result
if resource.terminal_url:
for pattern in self.base_url_blocklist:
if pattern in resource.terminal_url:
- result['status'] = 'skip-url-blocklist'
+ result["status"] = "skip-url-blocklist"
return result
if resource.terminal_url:
for pattern in self.cookie_blocklist:
if pattern in resource.terminal_url:
- result['status'] = 'blocked-cookie'
+ result["status"] = "blocked-cookie"
return result
if not resource.body:
- result['status'] = 'null-body'
+ result["status"] = "null-body"
return result
if len(resource.body) > MAX_BODY_SIZE_BYTES:
- result['status'] = 'body-too-large'
+ result["status"] = "body-too-large"
return result
file_meta = gen_file_metadata(resource.body)
try:
file_meta, resource = fix_transfer_encoding(file_meta, resource)
except Exception as e:
- result['status'] = 'bad-gzip-encoding'
- result['error_message'] = str(e)
+ result["status"] = "bad-gzip-encoding"
+ result["error_message"] = str(e)
return result
- if not resource.body or file_meta['size_bytes'] == 0:
- result['status'] = 'null-body'
+ if not resource.body or file_meta["size_bytes"] == 0:
+ result["status"] = "null-body"
return result
# here we split based on ingest type to try and extract a next hop
html_ish_resource = bool(
- "html" in file_meta['mimetype']
- or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml"
- or "application/xml" in file_meta['mimetype']
- or "text/xml" in file_meta['mimetype'])
+ "html" in file_meta["mimetype"]
+ or "xhtml" in file_meta["mimetype"] # matches "application/xhtml+xml"
+ or "application/xml" in file_meta["mimetype"]
+ or "text/xml" in file_meta["mimetype"]
+ )
html_biblio = None
html_doc = None
if html_ish_resource and resource.body:
@@ -191,10 +212,11 @@ class IngestFilesetWorker(IngestFileWorker):
html_doc = HTMLParser(resource.body)
html_biblio = html_extract_biblio(resource.terminal_url, html_doc)
if html_biblio:
- if 'html_biblio' not in result and html_biblio.title:
- result['html_biblio'] = json.loads(
- html_biblio.json(exclude_none=True))
- #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr)
+ if "html_biblio" not in result and html_biblio.title:
+ result["html_biblio"] = json.loads(
+ html_biblio.json(exclude_none=True)
+ )
+ # print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr)
except ValueError:
pass
@@ -204,69 +226,72 @@ class IngestFilesetWorker(IngestFileWorker):
assert resource.terminal_status_code in (200, 226)
if resource.terminal_url:
- result['terminal'] = {
+ result["terminal"] = {
"terminal_url": resource.terminal_url,
"terminal_dt": resource.terminal_dt,
"terminal_status_code": resource.terminal_status_code,
- "terminal_sha1hex": file_meta['sha1hex'],
+ "terminal_sha1hex": file_meta["sha1hex"],
}
- result['file_meta'] = file_meta
- result['cdx'] = cdx_to_dict(resource.cdx)
+ result["file_meta"] = file_meta
+ result["cdx"] = cdx_to_dict(resource.cdx)
if resource.revisit_cdx:
- result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx)
+ result["revisit_cdx"] = cdx_to_dict(resource.revisit_cdx)
if ingest_type == "pdf":
- if file_meta['mimetype'] != "application/pdf":
- result['status'] = "wrong-mimetype" # formerly: "other-mimetype"
+ if file_meta["mimetype"] != "application/pdf":
+ result["status"] = "wrong-mimetype" # formerly: "other-mimetype"
return result
elif ingest_type == "xml":
- if file_meta['mimetype'] not in ("application/xml", "text/xml",
- "application/jats+xml"):
- result['status'] = "wrong-mimetype"
+ if file_meta["mimetype"] not in (
+ "application/xml",
+ "text/xml",
+ "application/jats+xml",
+ ):
+ result["status"] = "wrong-mimetype"
return result
elif ingest_type == "html":
- if file_meta['mimetype'] not in ("text/html", "application/xhtml+xml"):
- result['status'] = "wrong-mimetype"
+ if file_meta["mimetype"] not in ("text/html", "application/xhtml+xml"):
+ result["status"] = "wrong-mimetype"
return result
else:
- #raise NotImplementedError()
+ # raise NotImplementedError()
pass
- result['_html_biblio'] = html_biblio
- result['_resource'] = resource
+ result["_html_biblio"] = html_biblio
+ result["_resource"] = resource
return result
def process(self, request: dict, key: Any = None) -> dict:
- ingest_type = request.get('ingest_type')
- if ingest_type not in ("dataset", ):
+ ingest_type = request.get("ingest_type")
+ if ingest_type not in ("dataset",):
raise NotImplementedError(f"can't handle ingest_type={ingest_type}")
# parse/clean URL
# note that we pass through the original/raw URL, and that is what gets
# persisted in database table
- base_url = clean_url(request['base_url'])
+ base_url = clean_url(request["base_url"])
- force_recrawl = bool(request.get('force_recrawl', False))
+ force_recrawl = bool(request.get("force_recrawl", False))
print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr)
# TODO: "existing" check against file and/or fileset ingest result table
- #existing = self.check_existing_ingest(ingest_type, base_url)
- #if existing:
+ # existing = self.check_existing_ingest(ingest_type, base_url)
+ # if existing:
# return self.process_existing(request, existing)
- result = self.fetch_resource_iteratively(ingest_type,
- base_url,
- force_recrawl=force_recrawl)
- result['request'] = request
- if result.get('status') is not None:
- result['request'] = request
+ result = self.fetch_resource_iteratively(
+ ingest_type, base_url, force_recrawl=force_recrawl
+ )
+ result["request"] = request
+ if result.get("status") is not None:
+ result["request"] = request
return result
- html_biblio = result.pop('_html_biblio')
- resource = result.pop('_resource')
+ html_biblio = result.pop("_html_biblio")
+ resource = result.pop("_resource")
# 1. Determine `platform`, which may involve resolving redirects and crawling a landing page.
@@ -280,166 +305,183 @@ class IngestFilesetWorker(IngestFileWorker):
break
if not platform_helper:
- result['status'] = 'no-platform-match'
+ result["status"] = "no-platform-match"
return result
# 2. Use platform-specific methods to fetch manifest metadata and decide on an `ingest_strategy`.
try:
dataset_meta = platform_helper.process_request(request, resource, html_biblio)
except PlatformScopeError as e:
- result['status'] = 'platform-scope'
- result['error_message'] = str(e)[:1600]
+ result["status"] = "platform-scope"
+ result["error_message"] = str(e)[:1600]
return result
except PlatformRestrictedError as e:
- result['status'] = 'platform-restricted'
- result['error_message'] = str(e)[:1600]
+ result["status"] = "platform-restricted"
+ result["error_message"] = str(e)[:1600]
return result
except NotImplementedError as e:
- result['status'] = 'not-implemented'
- result['error_message'] = str(e)[:1600]
+ result["status"] = "not-implemented"
+ result["error_message"] = str(e)[:1600]
return result
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
- result['status'] = 'platform-404'
- result['error_message'] = str(e)[:1600]
+ result["status"] = "platform-404"
+ result["error_message"] = str(e)[:1600]
return result
else:
raise e
- #print(dataset_meta, file=sys.stderr)
+ # print(dataset_meta, file=sys.stderr)
platform = dataset_meta.platform_name
- result['platform_name'] = dataset_meta.platform_name
- result['platform_domain'] = dataset_meta.platform_domain
- result['platform_id'] = dataset_meta.platform_id
- result['platform_base_url'] = dataset_meta.web_base_url
- result['archiveorg_item_name'] = dataset_meta.archiveorg_item_name
+ result["platform_name"] = dataset_meta.platform_name
+ result["platform_domain"] = dataset_meta.platform_domain
+ result["platform_id"] = dataset_meta.platform_id
+ result["platform_base_url"] = dataset_meta.web_base_url
+ result["archiveorg_item_name"] = dataset_meta.archiveorg_item_name
if not dataset_meta.manifest:
- result['status'] = 'empty-manifest'
+ result["status"] = "empty-manifest"
return result
# these will get confirmed/updated after ingest
- result['manifest'] = [m.dict(exclude_none=True) for m in dataset_meta.manifest]
- result['file_count'] = len(dataset_meta.manifest)
- result['total_size'] = sum([m.size for m in dataset_meta.manifest if m.size])
+ result["manifest"] = [m.dict(exclude_none=True) for m in dataset_meta.manifest]
+ result["file_count"] = len(dataset_meta.manifest)
+ result["total_size"] = sum([m.size for m in dataset_meta.manifest if m.size])
- if result['total_size'] > self.max_total_size:
- result['status'] = 'too-large-size'
+ if result["total_size"] > self.max_total_size:
+ result["status"] = "too-large-size"
return result
- if result['file_count'] > self.max_file_count:
+ if result["file_count"] > self.max_file_count:
# hard max, to prevent downstream breakage
- if result['file_count'] > 10 * 1000:
- result['manifest'] = result['manifest'][:self.max_file_count]
- result['status'] = 'too-many-files'
+ if result["file_count"] > 10 * 1000:
+ result["manifest"] = result["manifest"][: self.max_file_count]
+ result["status"] = "too-many-files"
return result
ingest_strategy = platform_helper.chose_strategy(dataset_meta)
- result['ingest_strategy'] = ingest_strategy
+ result["ingest_strategy"] = ingest_strategy
print(
f"[PLATFORM {platform}] id={dataset_meta.platform_id} file_count={result['file_count']} total_size={result['total_size']} strategy={ingest_strategy}",
- file=sys.stderr)
+ file=sys.stderr,
+ )
strategy_helper = self.dataset_strategy_archivers.get(ingest_strategy)
if not strategy_helper:
- result['status'] = 'no-strategy-helper'
+ result["status"] = "no-strategy-helper"
return result
# 3. Use strategy-specific methods to archive all files in platform manifest, and verify manifest metadata.
archive_result = strategy_helper.process(dataset_meta)
# 4. Summarize status and return structured result metadata.
- result['status'] = archive_result.status
- result['manifest'] = [m.dict(exclude_none=True) for m in archive_result.manifest]
+ result["status"] = archive_result.status
+ result["manifest"] = [m.dict(exclude_none=True) for m in archive_result.manifest]
- if ingest_strategy.endswith('-fileset-bundle'):
- result['fileset_bundle'] = dict()
+ if ingest_strategy.endswith("-fileset-bundle"):
+ result["fileset_bundle"] = dict()
if archive_result.bundle_file_meta:
- result['fileset_bundle']['file_meta'] = archive_result.bundle_file_meta
+ result["fileset_bundle"]["file_meta"] = archive_result.bundle_file_meta
if archive_result.bundle_archiveorg_path:
- result['fileset_bundle'][
- 'archiveorg_bundle_path'] = archive_result.bundle_archiveorg_path
+ result["fileset_bundle"][
+ "archiveorg_bundle_path"
+ ] = archive_result.bundle_archiveorg_path
if archive_result.bundle_resource:
- result['fileset_bundle']['terminal'] = dict(
+ result["fileset_bundle"]["terminal"] = dict(
terminal_url=archive_result.bundle_resource.terminal_url,
terminal_dt=archive_result.bundle_resource.terminal_dt,
terminal_status_code=archive_result.bundle_resource.terminal_status_code,
)
if archive_result.bundle_resource.cdx:
- result['fileset_bundle']['cdx'] = cdx_to_dict(
- archive_result.bundle_resource.cdx)
+ result["fileset_bundle"]["cdx"] = cdx_to_dict(
+ archive_result.bundle_resource.cdx
+ )
if archive_result.bundle_resource.revisit_cdx:
- result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(
- archive_result.bundle_resource.revisit_cdx)
+ result["fileset_bundle"]["revisit_cdx"] = cdx_to_dict(
+ archive_result.bundle_resource.revisit_cdx
+ )
- if ingest_strategy.endswith('-file'):
- result['fileset_file'] = dict()
+ if ingest_strategy.endswith("-file"):
+ result["fileset_file"] = dict()
if archive_result.file_file_meta:
- result['fileset_file']['file_meta'] = archive_result.file_file_meta,
+ result["fileset_file"]["file_meta"] = (archive_result.file_file_meta,)
if archive_result.file_resource:
- result['fileset_file']['terminal'] = dict(
+ result["fileset_file"]["terminal"] = dict(
terminal_url=archive_result.file_resource.terminal_url,
terminal_dt=archive_result.file_resource.terminal_dt,
terminal_status_code=archive_result.file_resource.terminal_status_code,
)
if archive_result.file_resource.cdx:
- result['fileset_file']['cdx'] = cdx_to_dict(
- archive_result.file_resource.cdx)
+ result["fileset_file"]["cdx"] = cdx_to_dict(
+ archive_result.file_resource.cdx
+ )
if archive_result.file_resource.revisit_cdx:
- result['fileset_file']['revisit_cdx'] = cdx_to_dict(
- archive_result.file_resource.revisit_cdx)
+ result["fileset_file"]["revisit_cdx"] = cdx_to_dict(
+ archive_result.file_resource.revisit_cdx
+ )
- if result['status'].startswith('success'):
+ if result["status"].startswith("success"):
# check that these are still valid
- assert result['file_count'] == len(archive_result.manifest)
- assert result['total_size'] == sum(
- [m.size for m in archive_result.manifest if m.size])
+ assert result["file_count"] == len(archive_result.manifest)
+ assert result["total_size"] == sum(
+ [m.size for m in archive_result.manifest if m.size]
+ )
- if result[
- 'status'] == 'success-file' and archive_result.file_resource and archive_result.file_file_meta:
+ if (
+ result["status"] == "success-file"
+ and archive_result.file_resource
+ and archive_result.file_file_meta
+ ):
file_result: Dict[str, Any] = dict(
hit=True,
- status='success',
+ status="success",
request=request.copy(),
file_meta=archive_result.file_file_meta,
terminal=dict(
terminal_url=archive_result.file_resource.terminal_url,
terminal_dt=archive_result.file_resource.terminal_dt,
terminal_status_code=archive_result.file_resource.terminal_status_code,
- terminal_sha1hex=archive_result.file_file_meta['sha1hex'],
+ terminal_sha1hex=archive_result.file_file_meta["sha1hex"],
),
)
if archive_result.file_resource.cdx:
- file_result['cdx'] = cdx_to_dict(archive_result.file_resource.cdx)
+ file_result["cdx"] = cdx_to_dict(archive_result.file_resource.cdx)
if archive_result.file_resource.revisit_cdx:
- file_result['revisit_cdx'] = cdx_to_dict(
- archive_result.file_resource.revisit_cdx)
- file_result['request']['ingest_type'] = request['ingest_type'] + "-file"
+ file_result["revisit_cdx"] = cdx_to_dict(
+ archive_result.file_resource.revisit_cdx
+ )
+ file_result["request"]["ingest_type"] = request["ingest_type"] + "-file"
# call the super() (ingest_file) version of process_hit()
- info = self.process_file_hit(file_result['request']['ingest_type'],
- archive_result.file_resource,
- archive_result.file_file_meta)
+ info = self.process_file_hit(
+ file_result["request"]["ingest_type"],
+ archive_result.file_resource,
+ archive_result.file_file_meta,
+ )
file_result.update(info)
if self.ingest_file_result_sink:
self.ingest_file_result_sink.push_record(result.copy())
elif self.ingest_file_result_stdout:
sys.stdout.write(json.dumps(file_result, sort_keys=True) + "\n")
- if result['status'].startswith('success'):
- result['hit'] = True
- print("[SUCCESS {:>5}] file_count={} total_size={} strategy={}".format(
- ingest_type,
- result['file_count'],
- result['total_size'],
- ingest_strategy,
- ),
- file=sys.stderr)
+ if result["status"].startswith("success"):
+ result["hit"] = True
+ print(
+ "[SUCCESS {:>5}] file_count={} total_size={} strategy={}".format(
+ ingest_type,
+ result["file_count"],
+ result["total_size"],
+ ingest_strategy,
+ ),
+ file=sys.stderr,
+ )
else:
- print("[FAIL {:>5}] status={} file_count={} total_size={} strategy={}".format(
- ingest_type,
- result['status'],
- result['file_count'],
- result['total_size'],
- ingest_strategy,
- ),
- file=sys.stderr)
+ print(
+ "[FAIL {:>5}] status={} file_count={} total_size={} strategy={}".format(
+ ingest_type,
+ result["status"],
+ result["file_count"],
+ result["total_size"],
+ ingest_strategy,
+ ),
+ file=sys.stderr,
+ )
return result
diff --git a/python/sandcrawler/ingest_html.py b/python/sandcrawler/ingest_html.py
index 91e5c6e..0ff7fe0 100644
--- a/python/sandcrawler/ingest_html.py
+++ b/python/sandcrawler/ingest_html.py
@@ -9,12 +9,26 @@ import pydantic
import trafilatura
from selectolax.parser import HTMLParser
-from sandcrawler.html_metadata import (BiblioMetadata, html_extract_biblio,
- html_extract_resources, load_adblock_rules)
-from sandcrawler.ia import (CdxApiClient, NoCaptureError, WaybackClient, WaybackContentError,
- cdx_to_dict, fix_transfer_encoding)
-from sandcrawler.misc import (datetime_to_cdx, gen_file_metadata, parse_cdx_datetime,
- url_fuzzy_equal)
+from sandcrawler.html_metadata import (
+ BiblioMetadata,
+ html_extract_biblio,
+ html_extract_resources,
+ load_adblock_rules,
+)
+from sandcrawler.ia import (
+ CdxApiClient,
+ NoCaptureError,
+ WaybackClient,
+ WaybackContentError,
+ cdx_to_dict,
+ fix_transfer_encoding,
+)
+from sandcrawler.misc import (
+ datetime_to_cdx,
+ gen_file_metadata,
+ parse_cdx_datetime,
+ url_fuzzy_equal,
+)
TRAFILATURA_AGENT = f"trafilatura/{trafilatura.__version__}"
@@ -23,7 +37,7 @@ def html_extract_body_teixml(doc: bytes) -> dict:
try:
tei_xml = trafilatura.extract(
doc,
- output_format='xmltei',
+ output_format="xmltei",
include_comments=False,
include_formatting=True,
)
@@ -35,12 +49,11 @@ def html_extract_body_teixml(doc: bytes) -> dict:
if tei_xml:
body_txt = teixml_body_text(tei_xml)
word_count = len(body_txt.split())
- return dict(status="success",
- agent=TRAFILATURA_AGENT,
- tei_xml=tei_xml,
- word_count=word_count)
+ return dict(
+ status="success", agent=TRAFILATURA_AGENT, tei_xml=tei_xml, word_count=word_count
+ )
elif doc.startswith(
- b'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 2012"http://www.w3.org/TR/html4/loose.dtd">'
+ b'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 2012"http://www.w3.org/TR/html4/loose.dtd">'
):
# hack for firstmonday.org
return html_extract_body_teixml(doc[106:])
@@ -51,7 +64,7 @@ def html_extract_body_teixml(doc: bytes) -> dict:
def teixml_body_text(doc_xml: str) -> str:
ns = {"tei": "http://www.tei-c.org/ns/1.0"}
tree = ET.fromstring(doc_xml)
- body = tree.find('.//tei:body', ns)
+ body = tree.find(".//tei:body", ns)
if body:
return " ".join(body.itertext())
else:
@@ -126,8 +139,9 @@ class HtmlMetaRow(pydantic.BaseModel):
)
-def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient,
- when: Optional[datetime.datetime]) -> List[WebResource]:
+def quick_fetch_html_resources(
+ resources: List[dict], cdx_client: CdxApiClient, when: Optional[datetime.datetime]
+) -> List[WebResource]:
"""
This is the lazy version that just does a CDX lookup for each resource.
@@ -138,12 +152,13 @@ def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient,
full = []
closest = when and datetime_to_cdx(when)
for resource in resources:
- cdx_row = cdx_client.lookup_best(resource['url'], closest=closest)
+ cdx_row = cdx_client.lookup_best(resource["url"], closest=closest)
if not cdx_row:
raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}")
- if cdx_row.url != resource['url'] and not url_fuzzy_equal(cdx_row.url, resource['url']):
- print(f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}",
- file=sys.stderr)
+ if cdx_row.url != resource["url"] and not url_fuzzy_equal(cdx_row.url, resource["url"]):
+ print(
+ f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}", file=sys.stderr
+ )
if not cdx_row.status_code:
# TODO: fall back to a full fetch?
print(" WARN: skipping revisit record", file=sys.stderr)
@@ -158,14 +173,16 @@ def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient,
status_code=cdx_row.status_code,
size=None,
sha256hex=None,
- resource_type=resource['type'],
- ))
+ resource_type=resource["type"],
+ )
+ )
return full
-def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient,
- when: Optional[datetime.datetime]) -> List[WebResource]:
+def fetch_html_resources(
+ resources: List[dict], wayback_client: WaybackClient, when: Optional[datetime.datetime]
+) -> List[WebResource]:
"""
This is the full version which fetches each resource from wayback/petabox
and calculates additional hashes.
@@ -176,11 +193,11 @@ def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient,
full = []
closest = when and datetime_to_cdx(when)
for resource in resources:
- wayback_resp = wayback_client.lookup_resource(resource['url'], closest=closest)
- if not wayback_resp or wayback_resp.status != 'success':
+ wayback_resp = wayback_client.lookup_resource(resource["url"], closest=closest)
+ if not wayback_resp or wayback_resp.status != "success":
raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}")
file_meta = gen_file_metadata(wayback_resp.body, allow_empty=True)
- if file_meta['sha1hex'] != wayback_resp.cdx.sha1hex:
+ if file_meta["sha1hex"] != wayback_resp.cdx.sha1hex:
raise WaybackContentError(
f"wayback payload sha1hex mismatch: {wayback_resp.cdx.datetime} {wayback_resp.cdx.url}"
)
@@ -189,25 +206,27 @@ def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient,
surt=wayback_resp.cdx.surt,
timestamp=parse_cdx_datetime(wayback_resp.cdx.datetime),
url=wayback_resp.cdx.url,
- sha1hex=file_meta['sha1hex'],
- mimetype=file_meta['mimetype'],
+ sha1hex=file_meta["sha1hex"],
+ mimetype=file_meta["mimetype"],
status_code=wayback_resp.cdx.status_code
or wayback_resp.revisit_cdx.status_code,
- size=file_meta['size_bytes'],
- sha256hex=file_meta['sha256hex'],
- resource_type=resource['type'],
- ))
+ size=file_meta["size_bytes"],
+ sha256hex=file_meta["sha256hex"],
+ resource_type=resource["type"],
+ )
+ )
return full
-def html_guess_platform(url: str, doc: HTMLParser,
- biblio: Optional[BiblioMetadata]) -> Optional[str]:
+def html_guess_platform(
+ url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata]
+) -> Optional[str]:
generator: Optional[str] = None
generator_elem = doc.css_first("meta[name='generator']")
if generator_elem:
- generator = generator_elem.attrs['content']
+ generator = generator_elem.attrs["content"]
else:
generator_elem = doc.css_first("a[id='developedBy']")
if generator_elem:
@@ -226,7 +245,10 @@ def html_guess_platform(url: str, doc: HTMLParser,
return "ojs"
else:
try:
- if 'powered by <a target="blank" href="http://pkp.sfu.ca/ojs/">PKP OJS</a>' in doc.html:
+ if (
+ 'powered by <a target="blank" href="http://pkp.sfu.ca/ojs/">PKP OJS</a>'
+ in doc.html
+ ):
return "ojs"
if 'Powered by <a target="_blank" href="http://arphahub.com">' in doc.html:
return "arpha"
@@ -236,20 +258,21 @@ def html_guess_platform(url: str, doc: HTMLParser,
pass
icon_elem = doc.css_first("link[type='image/x-icon']")
- if icon_elem and 'href' in icon_elem.attrs:
- if 'journalssystem.com' in icon_elem.attrs['href']:
+ if icon_elem and "href" in icon_elem.attrs:
+ if "journalssystem.com" in icon_elem.attrs["href"]:
return "journalssystem.com"
- elif 'indexcopernicus.com' in icon_elem.attrs['href']:
+ elif "indexcopernicus.com" in icon_elem.attrs["href"]:
return "indexcopernicus"
- if 'scielo' in url:
+ if "scielo" in url:
return "scielo"
return None
-def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata],
- word_count: Optional[int]) -> str:
+def html_guess_scope(
+ url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata], word_count: Optional[int]
+) -> str:
"""
This function tries to guess if an HTML document represents one of:
@@ -275,7 +298,7 @@ def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata]
"""
# assert that this is a real URL
- assert url.count('/') >= 2
+ assert url.count("/") >= 2
# basic paywall and loginwall detection based on URL
if url.endswith("/cookieAbsent"):
@@ -293,7 +316,7 @@ def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata]
return "blocked-captcha"
# is this the top-level URL of the domain? aka, no path?
- if url.count('/') <= 2 or (url.count('/') == 3) and url.endswith('/'):
+ if url.count("/") <= 2 or (url.count("/") == 3) and url.endswith("/"):
return "homepage-domain"
platform = html_guess_platform(url, doc, biblio)
@@ -340,7 +363,7 @@ def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata]
if word_count is not None:
if word_count < 20:
return "stub"
- elif word_count > 500 and platform in ['wordpress', 'blogger']:
+ elif word_count > 500 and platform in ["wordpress", "blogger"]:
return "article-fulltext"
elif word_count > 1200:
return "article-fulltext"
@@ -348,9 +371,9 @@ def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata]
return "unknown"
-def run_single(url: str,
- timestamp: Optional[str] = None,
- quick_mode: bool = False) -> IngestWebResult:
+def run_single(
+ url: str, timestamp: Optional[str] = None, quick_mode: bool = False
+) -> IngestWebResult:
adblock = load_adblock_rules()
wayback_client = WaybackClient()
@@ -368,7 +391,7 @@ def run_single(url: str,
file_meta = gen_file_metadata(html_resource.body)
file_meta, html_resource = fix_transfer_encoding(file_meta, html_resource)
- if file_meta['mimetype'] not in ("text/html", "text/xml"):
+ if file_meta["mimetype"] not in ("text/html", "text/xml"):
return IngestWebResult(
status="wrong-mimetype",
hit=False,
@@ -379,8 +402,8 @@ def run_single(url: str,
html_doc = HTMLParser(html_resource.body)
html_biblio = html_extract_biblio(url, html_doc)
html_body = html_extract_body_teixml(html_resource.body)
- html_scope = html_guess_scope(url, html_doc, html_biblio, html_body.get('word_count'))
- if html_scope not in ('article-fulltext', 'unknown'):
+ html_scope = html_guess_scope(url, html_doc, html_biblio, html_body.get("word_count"))
+ if html_scope not in ("article-fulltext", "unknown"):
return IngestWebResult(
status="wrong-scope",
hit=False,
@@ -397,8 +420,9 @@ def run_single(url: str,
full_resources: List[WebResource] = []
if quick_mode:
- full_resources = quick_fetch_html_resources(raw_resources, wayback_client.cdx_client,
- when)
+ full_resources = quick_fetch_html_resources(
+ raw_resources, wayback_client.cdx_client, when
+ )
else:
full_resources = fetch_html_resources(raw_resources, wayback_client, when)
@@ -425,8 +449,9 @@ def main() -> None:
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
subparsers = parser.add_subparsers()
- sub = subparsers.add_parser("single",
- help="tries to ingest a single URL, dumps result to stdout")
+ sub = subparsers.add_parser(
+ "single", help="tries to ingest a single URL, dumps result to stdout"
+ )
sub.set_defaults(func="run_single")
sub.add_argument(
"url",
@@ -453,8 +478,8 @@ def main() -> None:
result = run_single(args.url, args.timestamp, args.quick_mode)
print(result.json(indent=2, exclude_none=True))
else:
- #func = getattr(wp, args.func)
- #func()
+ # func = getattr(wp, args.func)
+ # func()
raise NotImplementedError()
diff --git a/python/sandcrawler/minio.py b/python/sandcrawler/minio.py
index 1967ba3..d47ab89 100644
--- a/python/sandcrawler/minio.py
+++ b/python/sandcrawler/minio.py
@@ -6,11 +6,13 @@ import minio
class SandcrawlerMinioClient(object):
- def __init__(self,
- host_url: str,
- access_key: str,
- secret_key: str,
- default_bucket: Optional[str] = None):
+ def __init__(
+ self,
+ host_url: str,
+ access_key: str,
+ secret_key: str,
+ default_bucket: Optional[str] = None,
+ ):
"""
host is minio connection string (host:port)
access and secret key are as expected
@@ -46,13 +48,15 @@ class SandcrawlerMinioClient(object):
)
return obj_path
- def put_blob(self,
- folder: str,
- blob: Union[str, bytes],
- sha1hex: Optional[str] = None,
- extension: str = "",
- prefix: str = "",
- bucket: Optional[str] = None) -> Tuple[str, str]:
+ def put_blob(
+ self,
+ folder: str,
+ blob: Union[str, bytes],
+ sha1hex: Optional[str] = None,
+ extension: str = "",
+ prefix: str = "",
+ bucket: Optional[str] = None,
+ ) -> Tuple[str, str]:
"""
blob should be bytes
sha1hex is assumed to be sha1 of the blob itself; if not supplied it will be calculated
@@ -61,7 +65,7 @@ class SandcrawlerMinioClient(object):
filename is SHA1 with an optional file extension.
"""
if type(blob) == str:
- blob = blob.encode('utf-8')
+ blob = blob.encode("utf-8")
assert type(blob) == bytes
if not sha1hex:
h = hashlib.sha1()
@@ -72,13 +76,13 @@ class SandcrawlerMinioClient(object):
bucket = self.default_bucket
assert bucket
content_type = "application/octet-stream"
- if extension.endswith('.xml'):
+ if extension.endswith(".xml"):
content_type = "application/xml"
- if extension.endswith('.png'):
+ if extension.endswith(".png"):
content_type = "image/png"
- elif extension.endswith('.jpg') or extension.endswith('.jpeg'):
+ elif extension.endswith(".jpg") or extension.endswith(".jpeg"):
content_type = "image/jpeg"
- elif extension.endswith('.txt'):
+ elif extension.endswith(".txt"):
content_type = "text/plain"
self.mc.put_object(
bucket,
@@ -89,12 +93,14 @@ class SandcrawlerMinioClient(object):
)
return (bucket, obj_path)
- def get_blob(self,
- folder: str,
- sha1hex: str,
- extension: str = "",
- prefix: str = "",
- bucket: str = None) -> bytes:
+ def get_blob(
+ self,
+ folder: str,
+ sha1hex: str,
+ extension: str = "",
+ prefix: str = "",
+ bucket: str = None,
+ ) -> bytes:
"""
sha1hex is sha1 of the blob itself
diff --git a/python/sandcrawler/misc.py b/python/sandcrawler/misc.py
index 1c779ce..db001dd 100644
--- a/python/sandcrawler/misc.py
+++ b/python/sandcrawler/misc.py
@@ -15,7 +15,7 @@ def clean_url(s: str) -> str:
s = s.strip()
parsed = urlcanon.parse_url(s)
if not parsed.port and parsed.colon_before_port:
- parsed.colon_before_port = b''
+ parsed.colon_before_port = b""
return str(urlcanon.whatwg(parsed))
@@ -23,10 +23,12 @@ def url_fuzzy_equal(left: str, right: str) -> bool:
"""
TODO: use proper surt library and canonicalization for this check
"""
- fuzzy_left = '://'.join(
- clean_url(left).replace('www.', '').replace(':80/', '/').split('://')[1:])
- fuzzy_right = '://'.join(
- clean_url(right).replace('www.', '').replace(':80/', '/').split('://')[1:])
+ fuzzy_left = "://".join(
+ clean_url(left).replace("www.", "").replace(":80/", "/").split("://")[1:]
+ )
+ fuzzy_right = "://".join(
+ clean_url(right).replace("www.", "").replace(":80/", "/").split("://")[1:]
+ )
if fuzzy_left == fuzzy_right:
return True
elif fuzzy_left == fuzzy_right + "/" or fuzzy_right == fuzzy_left + "/":
@@ -35,10 +37,13 @@ def url_fuzzy_equal(left: str, right: str) -> bool:
def test_url_fuzzy_equal() -> None:
- assert url_fuzzy_equal(
- "http://www.annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree",
- "http://annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree"
- ) is True
+ assert (
+ url_fuzzy_equal(
+ "http://www.annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree",
+ "http://annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree",
+ )
+ is True
+ )
def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict:
@@ -53,7 +58,7 @@ def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict:
if len(blob) < 1024 * 1024:
mimetype = magic.Magic(mime=True).from_buffer(blob)
else:
- mimetype = magic.Magic(mime=True).from_buffer(blob[:(1024 * 1024)])
+ mimetype = magic.Magic(mime=True).from_buffer(blob[: (1024 * 1024)])
if mimetype in ("application/xml", "text/xml"):
# crude checks for XHTML or JATS XML, using only first 1 kB of file
if b"<htm" in blob[:1024] and b'xmlns="http://www.w3.org/1999/xhtml"' in blob[:1024]:
@@ -83,10 +88,13 @@ def gen_file_metadata_path(path: str, allow_empty: bool = False) -> dict:
assert path is not None
mimetype = magic.Magic(mime=True).from_file(path)
if mimetype in ("application/xml", "text/xml"):
- with open(path, 'rb') as f:
+ with open(path, "rb") as f:
blob = f.read(1024)
# crude checks for XHTML or JATS XML, using only first 1 kB of file
- if b"<htm" in blob[:1024] and b'xmlns="http://www.w3.org/1999/xhtml"' in blob[:1024]:
+ if (
+ b"<htm" in blob[:1024]
+ and b'xmlns="http://www.w3.org/1999/xhtml"' in blob[:1024]
+ ):
mimetype = "application/xhtml+xml"
elif b"<article " in blob[:1024] and b"<html" not in blob[:1024]:
mimetype = "application/jats+xml"
@@ -96,7 +104,7 @@ def gen_file_metadata_path(path: str, allow_empty: bool = False) -> dict:
hashlib.md5(),
]
size_bytes = 0
- with open(path, 'rb') as f:
+ with open(path, "rb") as f:
while True:
chunk = f.read(1024 * 1024)
if not chunk:
@@ -128,15 +136,15 @@ def b32_hex(s: str) -> str:
if len(s) == 40:
return s
raise ValueError("not a base-32 encoded SHA-1 hash: {}".format(s))
- return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8')
+ return base64.b16encode(base64.b32decode(s.upper())).lower().decode("utf-8")
NORMAL_MIME = (
- 'application/pdf',
- 'application/postscript',
- 'text/html',
- 'text/xml',
- 'application/octet-stream',
+ "application/pdf",
+ "application/postscript",
+ "text/html",
+ "text/xml",
+ "application/octet-stream",
)
@@ -147,22 +155,22 @@ def normalize_mime(raw: str) -> Optional[str]:
return norm
# Special cases
- if raw.startswith('application/xml'):
- return 'text/xml'
- if raw.startswith('application/x-pdf'):
- return 'application/pdf'
- if raw in ('.pdf', ):
- return 'application/pdf'
+ if raw.startswith("application/xml"):
+ return "text/xml"
+ if raw.startswith("application/x-pdf"):
+ return "application/pdf"
+ if raw in (".pdf",):
+ return "application/pdf"
if raw in (
- 'application/download',
- 'binary/octet-stream',
- 'unk',
- 'application/x-download',
- 'application/octetstream',
- 'application/force-download',
- 'application/unknown',
+ "application/download",
+ "binary/octet-stream",
+ "unk",
+ "application/x-download",
+ "application/octetstream",
+ "application/force-download",
+ "application/unknown",
):
- return 'application/octet-stream'
+ return "application/octet-stream"
return None
@@ -200,14 +208,19 @@ def parse_cdx_line(raw_cdx: str, normalize: bool = True) -> Optional[dict]:
offset = cdx[9]
warc = cdx[10]
- if not (sha1b32.isalnum() and c_size.isdigit() and offset.isdigit() and len(sha1b32) == 32
- and dt.isdigit()):
+ if not (
+ sha1b32.isalnum()
+ and c_size.isdigit()
+ and offset.isdigit()
+ and len(sha1b32) == 32
+ and dt.isdigit()
+ ):
return None
- if '-' in (surt, dt, url, http_status, sha1b32, c_size, offset, warc):
+ if "-" in (surt, dt, url, http_status, sha1b32, c_size, offset, warc):
return None
- if mime is None or mime == '-':
+ if mime is None or mime == "-":
mime = "application/octet-stream"
if normalize:
@@ -242,16 +255,13 @@ def test_parse_cdx_datetime() -> None:
assert parse_cdx_datetime("") is None
assert parse_cdx_datetime("asdf") is None
assert parse_cdx_datetime("19930203123045") is not None
- assert parse_cdx_datetime("20201028235103") == datetime.datetime(year=2020,
- month=10,
- day=28,
- hour=23,
- minute=51,
- second=3)
+ assert parse_cdx_datetime("20201028235103") == datetime.datetime(
+ year=2020, month=10, day=28, hour=23, minute=51, second=3
+ )
def datetime_to_cdx(dt: datetime.datetime) -> str:
- return '%04d%02d%02d%02d%02d%02d' % (
+ return "%04d%02d%02d%02d%02d%02d" % (
dt.year,
dt.month,
dt.day,
@@ -263,13 +273,16 @@ def datetime_to_cdx(dt: datetime.datetime) -> str:
def test_datetime_to_cdx() -> None:
assert "20201028235103" == datetime_to_cdx(
- datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3))
+ datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3)
+ )
-def requests_retry_session(retries: int = 10,
- backoff_factor: int = 3,
- status_forcelist: List[int] = [500, 502, 504],
- session: requests.Session = None) -> requests.Session:
+def requests_retry_session(
+ retries: int = 10,
+ backoff_factor: int = 3,
+ status_forcelist: List[int] = [500, 502, 504],
+ session: requests.Session = None,
+) -> requests.Session:
"""
From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests
"""
@@ -282,8 +295,8 @@ def requests_retry_session(retries: int = 10,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
- session.mount('http://', adapter)
- session.mount('https://', adapter)
+ session.mount("http://", adapter)
+ session.mount("https://", adapter)
return session
diff --git a/python/sandcrawler/pdfextract.py b/python/sandcrawler/pdfextract.py
index 1d306d3..6c18395 100644
--- a/python/sandcrawler/pdfextract.py
+++ b/python/sandcrawler/pdfextract.py
@@ -173,64 +173,69 @@ class PdfExtractResult:
Outputs a JSON string as would be published to Kafka text/info topic.
"""
return {
- 'key': self.sha1hex,
- 'sha1hex': self.sha1hex,
- 'status': self.status,
- 'file_meta': self.file_meta,
- 'error_msg': self.error_msg,
- 'text': self.text,
- 'has_page0_thumbnail': self.has_page0_thumbnail,
- 'meta_xml': self.meta_xml,
- 'pdf_info': self.pdf_info,
- 'pdf_extra': self.pdf_extra,
- 'source': self.source,
+ "key": self.sha1hex,
+ "sha1hex": self.sha1hex,
+ "status": self.status,
+ "file_meta": self.file_meta,
+ "error_msg": self.error_msg,
+ "text": self.text,
+ "has_page0_thumbnail": self.has_page0_thumbnail,
+ "meta_xml": self.meta_xml,
+ "pdf_info": self.pdf_info,
+ "pdf_extra": self.pdf_extra,
+ "source": self.source,
}
@staticmethod
- def from_pdftext_dict(record: Dict[str, Any]) -> 'PdfExtractResult':
+ def from_pdftext_dict(record: Dict[str, Any]) -> "PdfExtractResult":
"""
Outputs a JSON string as would be published to Kafka text/info topic.
"""
- if record['status'] != 'success':
+ if record["status"] != "success":
return PdfExtractResult(
- sha1hex=record.get('sha1hex') or record['key'],
- status=record['status'],
- error_msg=record.get('error_msg'),
+ sha1hex=record.get("sha1hex") or record["key"],
+ status=record["status"],
+ error_msg=record.get("error_msg"),
)
else:
return PdfExtractResult(
- sha1hex=record['sha1hex'],
- status=record['status'],
- file_meta=record.get('file_meta'),
- text=record.get('text'),
- has_page0_thumbnail=bool(record.get('has_page0_thumbnail', False)),
- meta_xml=record.get('meta_xml'),
- pdf_info=record.get('pdf_info'),
- pdf_extra=record.get('pdf_extra'),
+ sha1hex=record["sha1hex"],
+ status=record["status"],
+ file_meta=record.get("file_meta"),
+ text=record.get("text"),
+ has_page0_thumbnail=bool(record.get("has_page0_thumbnail", False)),
+ meta_xml=record.get("meta_xml"),
+ pdf_info=record.get("pdf_info"),
+ pdf_extra=record.get("pdf_extra"),
)
@staticmethod
- def from_pdf_meta_dict(record: Dict[str, Any]) -> 'PdfExtractResult':
+ def from_pdf_meta_dict(record: Dict[str, Any]) -> "PdfExtractResult":
"""
Parses what would be returned from postgrest
"""
- if record['status'] != 'success':
+ if record["status"] != "success":
return PdfExtractResult(
- sha1hex=record['sha1hex'],
- status=record['status'],
- error_msg=(record.get('metadata') or {}).get('error_msg'),
+ sha1hex=record["sha1hex"],
+ status=record["status"],
+ error_msg=(record.get("metadata") or {}).get("error_msg"),
)
else:
pdf_extra = dict()
- for k in ('page_count', 'page0_height', 'page0_width', 'permanent_id',
- 'pdf_version'):
+ for k in (
+ "page_count",
+ "page0_height",
+ "page0_width",
+ "permanent_id",
+ "pdf_version",
+ ):
if record.get(k):
pdf_extra[k] = record[k]
return PdfExtractResult(
- sha1hex=record['sha1hex'],
- status=record['status'],
- has_page0_thumbnail=bool(record.get('has_page0_thumbnail', False)),
- pdf_info=record.get('metadata'),
+ sha1hex=record["sha1hex"],
+ status=record["status"],
+ has_page0_thumbnail=bool(record.get("has_page0_thumbnail", False)),
+ pdf_info=record.get("metadata"),
pdf_extra=pdf_extra,
)
@@ -247,11 +252,11 @@ class PdfExtractResult:
# TODO: form, encrypted
if self.pdf_info:
metadata = dict()
- for k in ('Title', 'Subject', 'Author', 'Creator', 'Producer', 'doi'):
+ for k in ("Title", "Subject", "Author", "Creator", "Producer", "doi"):
if k in self.pdf_info:
metadata[k.lower()] = self.pdf_info[k]
- if 'CreationDate' in self.pdf_info:
- pdf_created = self.pdf_info['CreationDate']
+ if "CreationDate" in self.pdf_info:
+ pdf_created = self.pdf_info["CreationDate"]
metadata_json: Optional[str] = None
if metadata:
metadata_json = json.dumps(metadata, sort_keys=True)
@@ -260,20 +265,20 @@ class PdfExtractResult:
datetime.datetime.now(), # updated
self.status,
self.has_page0_thumbnail,
- pdf_extra.get('page_count'),
+ pdf_extra.get("page_count"),
word_count,
- pdf_extra.get('page0_height'),
- pdf_extra.get('page0_width'),
- pdf_extra.get('permanent_id'),
+ pdf_extra.get("page0_height"),
+ pdf_extra.get("page0_width"),
+ pdf_extra.get("permanent_id"),
pdf_created,
- pdf_extra.get('pdf_version'),
+ pdf_extra.get("pdf_version"),
metadata_json,
)
-def process_pdf(blob: bytes,
- thumb_size: Tuple[int, int] = (180, 300),
- thumb_type: str = "JPEG") -> PdfExtractResult:
+def process_pdf(
+ blob: bytes, thumb_size: Tuple[int, int] = (180, 300), thumb_type: str = "JPEG"
+) -> PdfExtractResult:
"""
A known issue is that output text is in "physical layout" mode, which means
columns will be side-by-side. We would prefer a single stream of tokens!
@@ -283,11 +288,11 @@ def process_pdf(blob: bytes,
didn't seem to work at all (returned empty strings).
"""
file_meta = gen_file_metadata(blob)
- sha1hex = file_meta['sha1hex']
- if file_meta['mimetype'] != 'application/pdf':
+ sha1hex = file_meta["sha1hex"]
+ if file_meta["mimetype"] != "application/pdf":
return PdfExtractResult(
sha1hex=sha1hex,
- status='not-pdf',
+ status="not-pdf",
error_msg=f"mimetype is '{file_meta['mimetype']}'",
file_meta=file_meta,
)
@@ -295,7 +300,7 @@ def process_pdf(blob: bytes,
if sha1hex in BAD_PDF_SHA1HEX:
return PdfExtractResult(
sha1hex=sha1hex,
- status='bad-pdf',
+ status="bad-pdf",
error_msg="PDF known to cause processing issues",
file_meta=file_meta,
)
@@ -306,7 +311,7 @@ def process_pdf(blob: bytes,
if pdf is None:
return PdfExtractResult(
sha1hex=sha1hex,
- status='empty-pdf',
+ status="empty-pdf",
file_meta=file_meta,
has_page0_thumbnail=False,
)
@@ -314,7 +319,7 @@ def process_pdf(blob: bytes,
if page0 is None:
return PdfExtractResult(
sha1hex=sha1hex,
- status='empty-page0',
+ status="empty-page0",
file_meta=file_meta,
)
# this call sometimes fails an returns an AttributeError
@@ -324,7 +329,7 @@ def process_pdf(blob: bytes,
# starting with a narrow set
return PdfExtractResult(
sha1hex=sha1hex,
- status='parse-error',
+ status="parse-error",
error_msg=str(e),
file_meta=file_meta,
)
@@ -334,8 +339,9 @@ def process_pdf(blob: bytes,
renderer = poppler.PageRenderer()
try:
full_img = renderer.render_page(page0)
- img = Image.frombuffer("RGBA", (full_img.width, full_img.height), full_img.data, 'raw',
- "BGRA", 0, 1)
+ img = Image.frombuffer(
+ "RGBA", (full_img.width, full_img.height), full_img.data, "raw", "BGRA", 0, 1
+ )
img.thumbnail(thumb_size, Image.BICUBIC)
buf = BytesIO()
img.save(buf, thumb_type)
@@ -355,7 +361,7 @@ def process_pdf(blob: bytes,
except AttributeError as e:
return PdfExtractResult(
sha1hex=sha1hex,
- status='parse-error',
+ status="parse-error",
error_msg=str(e),
file_meta=file_meta,
)
@@ -364,14 +370,14 @@ def process_pdf(blob: bytes,
if len(full_text) > 1000000:
return PdfExtractResult(
sha1hex=sha1hex,
- status='text-too-large',
+ status="text-too-large",
error_msg="full_text chars: {}".format(len(full_text)),
file_meta=file_meta,
)
if len(pdf.metadata) > 1000000:
return PdfExtractResult(
sha1hex=sha1hex,
- status='text-too-large',
+ status="text-too-large",
error_msg="meta_xml chars: {}".format(len(full_text)),
file_meta=file_meta,
)
@@ -381,7 +387,7 @@ def process_pdf(blob: bytes,
except UnicodeDecodeError:
return PdfExtractResult(
sha1hex=sha1hex,
- status='bad-unicode',
+ status="bad-unicode",
error_msg="in infos()",
file_meta=file_meta,
)
@@ -402,7 +408,7 @@ def process_pdf(blob: bytes,
return PdfExtractResult(
sha1hex=sha1hex,
file_meta=file_meta,
- status='success',
+ status="success",
error_msg=None,
text=full_text or None,
has_page0_thumbnail=page0_thumbnail is not None,
@@ -421,17 +427,19 @@ def process_pdf(blob: bytes,
class PdfExtractWorker(SandcrawlerFetchWorker):
- def __init__(self,
- wayback_client: Optional[WaybackClient] = None,
- sink: Optional[SandcrawlerWorker] = None,
- **kwargs):
+ def __init__(
+ self,
+ wayback_client: Optional[WaybackClient] = None,
+ sink: Optional[SandcrawlerWorker] = None,
+ **kwargs,
+ ):
super().__init__(wayback_client=wayback_client)
self.wayback_client = wayback_client
self.sink = sink
- self.thumbnail_sink = kwargs.get('thumbnail_sink')
+ self.thumbnail_sink = kwargs.get("thumbnail_sink")
def timeout_response(self, task: Dict[str, Any]) -> Dict[str, Any]:
- default_key = task['sha1hex']
+ default_key = task["sha1hex"]
return dict(
status="error-timeout",
error_msg="internal pdf-extract worker timeout",
@@ -441,9 +449,9 @@ class PdfExtractWorker(SandcrawlerFetchWorker):
def process(self, record: Any, key: Optional[str] = None) -> dict:
fetch_result = self.fetch_blob(record)
- if fetch_result['status'] != 'success':
+ if fetch_result["status"] != "success":
return fetch_result
- blob: bytes = fetch_result['blob']
+ blob: bytes = fetch_result["blob"]
assert blob and isinstance(blob, bytes)
result = process_pdf(blob)
@@ -458,10 +466,11 @@ class PdfExtractBlobWorker(SandcrawlerWorker):
This is sort of like PdfExtractWorker, except it receives blobs directly,
instead of fetching blobs from some remote store.
"""
+
def __init__(self, sink: Optional[SandcrawlerWorker] = None, **kwargs):
super().__init__()
self.sink = sink
- self.thumbnail_sink = kwargs.get('thumbnail_sink')
+ self.thumbnail_sink = kwargs.get("thumbnail_sink")
def process(self, blob: Any, key: Optional[str] = None) -> Any:
if not blob:
diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py
index 138e65c..d765164 100644
--- a/python/sandcrawler/pdftrio.py
+++ b/python/sandcrawler/pdftrio.py
@@ -32,37 +32,37 @@ class PdfTrioClient(object):
pdftrio_response = requests.post(
self.host_url + "/classify/research-pub/" + mode,
files={
- 'pdf_content': blob,
+ "pdf_content": blob,
},
timeout=60.0,
)
except requests.Timeout:
return {
- 'status': 'error-timeout',
- 'status_code': -4, # heritrix3 "HTTP timeout" code
- 'error_msg': 'pdftrio request (HTTP POST) timeout',
+ "status": "error-timeout",
+ "status_code": -4, # heritrix3 "HTTP timeout" code
+ "error_msg": "pdftrio request (HTTP POST) timeout",
}
except requests.exceptions.ConnectionError:
# crude back-off
time.sleep(2.0)
return {
- 'status': 'error-connect',
- 'status_code': -2, # heritrix3 "HTTP connect" code
- 'error_msg': 'pdftrio request connection timout',
+ "status": "error-connect",
+ "status_code": -2, # heritrix3 "HTTP connect" code
+ "error_msg": "pdftrio request connection timout",
}
info: Dict[str, Any] = dict(status_code=pdftrio_response.status_code)
if pdftrio_response.status_code == 200:
resp_json = pdftrio_response.json()
- assert 'ensemble_score' in resp_json
- assert 'status' in resp_json
- assert 'versions' in resp_json
+ assert "ensemble_score" in resp_json
+ assert "status" in resp_json
+ assert "versions" in resp_json
info.update(resp_json)
else:
- info['status'] = 'error'
+ info["status"] = "error"
# TODO: might return JSON with some info?
- info['_total_sec'] = pdftrio_response.elapsed.total_seconds()
+ info["_total_sec"] = pdftrio_response.elapsed.total_seconds()
return info
@@ -70,11 +70,14 @@ class PdfTrioWorker(SandcrawlerFetchWorker):
"""
This class is basically copied directly from GrobidWorker
"""
- def __init__(self,
- pdftrio_client: PdfTrioClient,
- wayback_client: Optional[WaybackClient] = None,
- sink: Optional[SandcrawlerWorker] = None,
- **kwargs):
+
+ def __init__(
+ self,
+ pdftrio_client: PdfTrioClient,
+ wayback_client: Optional[WaybackClient] = None,
+ sink: Optional[SandcrawlerWorker] = None,
+ **kwargs
+ ):
super().__init__(wayback_client=wayback_client, **kwargs)
self.pdftrio_client = pdftrio_client
self.sink = sink
@@ -86,22 +89,22 @@ class PdfTrioWorker(SandcrawlerFetchWorker):
start = time.time()
fetch_result = self.fetch_blob(record)
fetch_sec = time.time() - start
- if fetch_result['status'] != 'success':
+ if fetch_result["status"] != "success":
return fetch_result
- blob: bytes = fetch_result['blob']
+ blob: bytes = fetch_result["blob"]
assert blob and isinstance(blob, bytes)
result = dict()
- result['file_meta'] = gen_file_metadata(blob)
- result['key'] = result['file_meta']['sha1hex']
- result['pdf_trio'] = self.pdftrio_client.classify_pdf(blob)
- result['source'] = record
- result['timing'] = dict(
- pdftrio_sec=result['pdf_trio'].pop('_total_sec', None),
+ result["file_meta"] = gen_file_metadata(blob)
+ result["key"] = result["file_meta"]["sha1hex"]
+ result["pdf_trio"] = self.pdftrio_client.classify_pdf(blob)
+ result["source"] = record
+ result["timing"] = dict(
+ pdftrio_sec=result["pdf_trio"].pop("_total_sec", None),
total_sec=time.time() - start_process,
)
if fetch_sec:
- result['timing']['fetch_sec'] = fetch_sec
+ result["timing"]["fetch_sec"] = fetch_sec
return result
@@ -110,11 +113,14 @@ class PdfTrioBlobWorker(SandcrawlerWorker):
This is sort of like PdfTrioWorker, except it receives blobs directly,
instead of fetching blobs from some remote store.
"""
- def __init__(self,
- pdftrio_client: PdfTrioClient,
- sink: Optional[SandcrawlerWorker] = None,
- mode: str = "auto",
- **kwargs):
+
+ def __init__(
+ self,
+ pdftrio_client: PdfTrioClient,
+ sink: Optional[SandcrawlerWorker] = None,
+ mode: str = "auto",
+ **kwargs
+ ):
super().__init__(**kwargs)
self.pdftrio_client = pdftrio_client
self.sink = sink
@@ -126,11 +132,11 @@ class PdfTrioBlobWorker(SandcrawlerWorker):
return None
assert isinstance(blob, bytes)
result = dict()
- result['file_meta'] = gen_file_metadata(blob)
- result['key'] = result['file_meta']['sha1hex']
- result['pdf_trio'] = self.pdftrio_client.classify_pdf(blob, mode=self.mode)
- result['timing'] = dict(
- pdftrio_sec=result['pdf_trio'].pop('_total_sec', None),
+ result["file_meta"] = gen_file_metadata(blob)
+ result["key"] = result["file_meta"]["sha1hex"]
+ result["pdf_trio"] = self.pdftrio_client.classify_pdf(blob, mode=self.mode)
+ result["timing"] = dict(
+ pdftrio_sec=result["pdf_trio"].pop("_total_sec", None),
total_sec=time.time() - start_process,
)
return result
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 8ec5979..f7954b1 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -41,14 +41,14 @@ class PersistCdxWorker(SandcrawlerWorker):
raise NotImplementedError
def push_batch(self, batch: list) -> list:
- self.counts['total'] += len(batch)
+ self.counts["total"] += len(batch)
# filter to full CDX lines, no liveweb
- cdx_batch = [r for r in batch if r.get('warc_path') and ("/" in r['warc_path'])]
+ cdx_batch = [r for r in batch if r.get("warc_path") and ("/" in r["warc_path"])]
resp = self.db.insert_cdx(self.cur, cdx_batch)
if len(cdx_batch) < len(batch):
- self.counts['skip'] += len(batch) - len(cdx_batch)
- self.counts['insert-cdx'] += resp[0]
- self.counts['update-cdx'] += resp[1]
+ self.counts["skip"] += len(batch) - len(cdx_batch)
+ self.counts["insert-cdx"] += resp[0]
+ self.counts["update-cdx"] += resp[1]
self.db.commit()
return []
@@ -70,47 +70,52 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
if there is a problem with conversion, return None
"""
# backwards compat hacks; transform request to look like current schema
- if raw.get('ingest_type') == 'file':
- raw['ingest_type'] = 'pdf'
- if (not raw.get('link_source') and raw.get('base_url')
- and raw.get('ext_ids', {}).get('doi')
- and raw['base_url'] == "https://doi.org/{}".format(raw['ext_ids']['doi'])):
+ if raw.get("ingest_type") == "file":
+ raw["ingest_type"] = "pdf"
+ if (
+ not raw.get("link_source")
+ and raw.get("base_url")
+ and raw.get("ext_ids", {}).get("doi")
+ and raw["base_url"] == "https://doi.org/{}".format(raw["ext_ids"]["doi"])
+ ):
# set link_source(_id) for old ingest requests
- raw['link_source'] = 'doi'
- raw['link_source_id'] = raw['ext_ids']['doi']
- if (not raw.get('link_source')
- and raw.get('ingest_request_source', '').startswith('savepapernow')
- and raw.get('fatcat', {}).get('release_ident')):
+ raw["link_source"] = "doi"
+ raw["link_source_id"] = raw["ext_ids"]["doi"]
+ if (
+ not raw.get("link_source")
+ and raw.get("ingest_request_source", "").startswith("savepapernow")
+ and raw.get("fatcat", {}).get("release_ident")
+ ):
# set link_source(_id) for old ingest requests
- raw['link_source'] = 'spn'
- raw['link_source_id'] = raw['fatcat']['release_ident']
+ raw["link_source"] = "spn"
+ raw["link_source_id"] = raw["fatcat"]["release_ident"]
- for k in ('ingest_type', 'base_url', 'link_source', 'link_source_id'):
+ for k in ("ingest_type", "base_url", "link_source", "link_source_id"):
if k not in raw:
- self.counts['skip-request-fields'] += 1
+ self.counts["skip-request-fields"] += 1
return None
- if raw['ingest_type'] not in ('pdf', 'xml', 'html'):
- self.counts['skip-ingest-type'] += 1
+ if raw["ingest_type"] not in ("pdf", "xml", "html"):
+ self.counts["skip-ingest-type"] += 1
return None
request = {
- 'ingest_type': raw['ingest_type'],
- 'base_url': raw['base_url'],
- 'link_source': raw['link_source'],
- 'link_source_id': raw['link_source_id'],
- 'ingest_request_source': raw.get('ingest_request_source'),
- 'request': {},
+ "ingest_type": raw["ingest_type"],
+ "base_url": raw["base_url"],
+ "link_source": raw["link_source"],
+ "link_source_id": raw["link_source_id"],
+ "ingest_request_source": raw.get("ingest_request_source"),
+ "request": {},
}
# extra/optional fields
- if raw.get('release_stage'):
- request['release_stage'] = raw['release_stage']
- if raw.get('fatcat', {}).get('release_ident'):
- request['request']['release_ident'] = raw['fatcat']['release_ident']
- for k in ('ext_ids', 'edit_extra', 'rel'):
+ if raw.get("release_stage"):
+ request["release_stage"] = raw["release_stage"]
+ if raw.get("fatcat", {}).get("release_ident"):
+ request["request"]["release_ident"] = raw["fatcat"]["release_ident"]
+ for k in ("ext_ids", "edit_extra", "rel"):
if raw.get(k):
- request['request'][k] = raw[k]
+ request["request"][k] = raw[k]
# if this dict is empty, trim it to save DB space
- if not request['request']:
- request['request'] = None
+ if not request["request"]:
+ request["request"] = None
return request
def file_result_to_row(self, raw: dict) -> Optional[dict]:
@@ -119,59 +124,68 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
if there is a problem with conversion, return None and set skip count
"""
- for k in ('request', 'hit', 'status'):
+ for k in ("request", "hit", "status"):
if k not in raw:
- self.counts['skip-result-fields'] += 1
+ self.counts["skip-result-fields"] += 1
return None
- if 'base_url' not in raw['request']:
- self.counts['skip-result-fields'] += 1
+ if "base_url" not in raw["request"]:
+ self.counts["skip-result-fields"] += 1
return None
- ingest_type = raw['request'].get('ingest_type')
- if ingest_type == 'file':
- ingest_type = 'pdf'
- if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset',
- 'dataset-file'):
- self.counts['skip-ingest-type'] += 1
+ ingest_type = raw["request"].get("ingest_type")
+ if ingest_type == "file":
+ ingest_type = "pdf"
+ if ingest_type not in (
+ "pdf",
+ "xml",
+ "html",
+ "component",
+ "src",
+ "dataset",
+ "dataset-file",
+ ):
+ self.counts["skip-ingest-type"] += 1
return None
- if raw['status'] in ("existing", ):
- self.counts['skip-existing'] += 1
+ if raw["status"] in ("existing",):
+ self.counts["skip-existing"] += 1
return None
result = {
- 'ingest_type': ingest_type,
- 'base_url': raw['request']['base_url'],
- 'hit': raw['hit'],
- 'status': raw['status'],
+ "ingest_type": ingest_type,
+ "base_url": raw["request"]["base_url"],
+ "hit": raw["hit"],
+ "status": raw["status"],
}
- terminal = raw.get('terminal')
+ terminal = raw.get("terminal")
if terminal:
- result['terminal_url'] = terminal.get('terminal_url') or terminal.get('url')
- result['terminal_dt'] = terminal.get('terminal_dt')
- result['terminal_status_code'] = terminal.get(
- 'terminal_status_code') or terminal.get('status_code') or terminal.get(
- 'http_code')
- if result['terminal_status_code']:
- result['terminal_status_code'] = int(result['terminal_status_code'])
- result['terminal_sha1hex'] = terminal.get('terminal_sha1hex')
- if len(result['terminal_url']) > 2048:
+ result["terminal_url"] = terminal.get("terminal_url") or terminal.get("url")
+ result["terminal_dt"] = terminal.get("terminal_dt")
+ result["terminal_status_code"] = (
+ terminal.get("terminal_status_code")
+ or terminal.get("status_code")
+ or terminal.get("http_code")
+ )
+ if result["terminal_status_code"]:
+ result["terminal_status_code"] = int(result["terminal_status_code"])
+ result["terminal_sha1hex"] = terminal.get("terminal_sha1hex")
+ if len(result["terminal_url"]) > 2048:
# postgresql13 doesn't like extremely large URLs in b-tree index
- self.counts['skip-huge-url'] += 1
+ self.counts["skip-huge-url"] += 1
return None
return result
def result_to_html_meta(self, record: dict) -> Optional[HtmlMetaRow]:
- html_body = record.get('html_body')
- file_meta = record.get('file_meta')
+ html_body = record.get("html_body")
+ file_meta = record.get("file_meta")
if not (file_meta and html_body):
return None
return HtmlMetaRow(
sha1hex=file_meta["sha1hex"],
- status=record.get('status'),
- scope=record.get('scope'),
- has_teixml=bool(html_body and html_body['status'] == 'success'),
+ status=record.get("status"),
+ scope=record.get("scope"),
+ has_teixml=bool(html_body and html_body["status"] == "success"),
has_thumbnail=False, # TODO
- word_count=(html_body and html_body.get('word_count')) or None,
- biblio=record.get('html_biblio'),
- resources=record.get('html_resources'),
+ word_count=(html_body and html_body.get("word_count")) or None,
+ biblio=record.get("html_biblio"),
+ resources=record.get("html_resources"),
)
def result_to_platform_row(self, raw: dict) -> Optional[dict]:
@@ -180,46 +194,49 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
if there is a problem with conversion, return None and set skip count
"""
- for k in ('request', 'hit', 'status'):
+ for k in ("request", "hit", "status"):
if k not in raw:
return None
- if 'base_url' not in raw['request']:
+ if "base_url" not in raw["request"]:
return None
- ingest_type = raw['request'].get('ingest_type')
- if ingest_type not in ('dataset'):
+ ingest_type = raw["request"].get("ingest_type")
+ if ingest_type not in ("dataset"):
return None
- if raw['status'] in ("existing", ):
+ if raw["status"] in ("existing",):
return None
- if not raw.get('platform_name'):
+ if not raw.get("platform_name"):
return None
result = {
- 'ingest_type': ingest_type,
- 'base_url': raw['request']['base_url'],
- 'hit': raw['hit'],
- 'status': raw['status'],
- 'platform_name': raw.get('platform_name'),
- 'platform_domain': raw.get('platform_domain'),
- 'platform_id': raw.get('platform_id'),
- 'ingest_strategy': raw.get('ingest_strategy'),
- 'total_size': raw.get('total_size'),
- 'file_count': raw.get('file_count'),
- 'archiveorg_item_name': raw.get('archiveorg_item_name'),
- 'archiveorg_item_bundle_path': None,
- 'web_bundle_url': None,
- 'web_bundle_dt': None,
- 'manifest': raw.get('manifest'),
+ "ingest_type": ingest_type,
+ "base_url": raw["request"]["base_url"],
+ "hit": raw["hit"],
+ "status": raw["status"],
+ "platform_name": raw.get("platform_name"),
+ "platform_domain": raw.get("platform_domain"),
+ "platform_id": raw.get("platform_id"),
+ "ingest_strategy": raw.get("ingest_strategy"),
+ "total_size": raw.get("total_size"),
+ "file_count": raw.get("file_count"),
+ "archiveorg_item_name": raw.get("archiveorg_item_name"),
+ "archiveorg_item_bundle_path": None,
+ "web_bundle_url": None,
+ "web_bundle_dt": None,
+ "manifest": raw.get("manifest"),
}
- if result.get('fileset_bundle'):
- result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get(
- 'archiveorg_item_bundle_path')
- result['web_bundle_url'] = result['fileset_bundle'].get('terminal',
- {}).get('terminal_url')
- result['web_bundle_dt'] = result['fileset_bundle'].get('terminal',
- {}).get('terminal_dt')
+ if result.get("fileset_bundle"):
+ result["archiveorg_item_bundle_path"] = result["fileset_bundle"].get(
+ "archiveorg_item_bundle_path"
+ )
+ result["web_bundle_url"] = (
+ result["fileset_bundle"].get("terminal", {}).get("terminal_url")
+ )
+ result["web_bundle_dt"] = (
+ result["fileset_bundle"].get("terminal", {}).get("terminal_dt")
+ )
return result
def push_batch(self, batch: List[Any]) -> List[Any]:
- self.counts['total'] += len(batch)
+ self.counts["total"] += len(batch)
if not batch:
return []
@@ -228,60 +245,62 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
results = [r for r in results_unfiltered if r]
irequests_unfiltered = [
- self.request_to_row(raw['request']) for raw in batch if raw.get('request')
+ self.request_to_row(raw["request"]) for raw in batch if raw.get("request")
]
irequests = [
- r for r in irequests_unfiltered if r and r['ingest_type'] != 'dataset-file'
+ r for r in irequests_unfiltered if r and r["ingest_type"] != "dataset-file"
]
if irequests:
resp = self.db.insert_ingest_request(self.cur, irequests)
- self.counts['insert-requests'] += resp[0]
- self.counts['update-requests'] += resp[1]
+ self.counts["insert-requests"] += resp[0]
+ self.counts["update-requests"] += resp[1]
if results:
resp = self.db.insert_ingest_file_result(self.cur, results, on_conflict="update")
- self.counts['insert-results'] += resp[0]
- self.counts['update-results'] += resp[1]
+ self.counts["insert-results"] += resp[0]
+ self.counts["update-results"] += resp[1]
# these schemas match, so can just pass through
- cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx')]
+ cdx_batch = [r["cdx"] for r in batch if r.get("hit") and r.get("cdx")]
revisit_cdx_batch = [
- r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx')
+ r["revisit_cdx"] for r in batch if r.get("hit") and r.get("revisit_cdx")
]
cdx_batch.extend(revisit_cdx_batch)
# filter to full CDX lines, with full warc_paths (not liveweb)
- cdx_batch = [r for r in cdx_batch if r.get('warc_path') and ("/" in r['warc_path'])]
+ cdx_batch = [r for r in cdx_batch if r.get("warc_path") and ("/" in r["warc_path"])]
if cdx_batch:
resp = self.db.insert_cdx(self.cur, cdx_batch)
- self.counts['insert-cdx'] += resp[0]
- self.counts['update-cdx'] += resp[1]
+ self.counts["insert-cdx"] += resp[0]
+ self.counts["update-cdx"] += resp[1]
- file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')]
+ file_meta_batch = [r["file_meta"] for r in batch if r.get("hit") and r.get("file_meta")]
if file_meta_batch:
resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="nothing")
- self.counts['insert-file_meta'] += resp[0]
- self.counts['update-file_meta'] += resp[1]
+ self.counts["insert-file_meta"] += resp[0]
+ self.counts["update-file_meta"] += resp[1]
html_meta_batch = [
- self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body')
+ self.result_to_html_meta(r) for r in batch if r.get("hit") and r.get("html_body")
]
if html_meta_batch:
rows = [d.to_sql_tuple() for d in html_meta_batch if d]
resp = self.db.insert_html_meta(self.cur, rows, on_conflict="update")
- self.counts['insert-html_meta'] += resp[0]
- self.counts['update-html_meta'] += resp[1]
+ self.counts["insert-html_meta"] += resp[0]
+ self.counts["update-html_meta"] += resp[1]
fileset_platform_batch_all = [
- self.result_to_platform_row(raw) for raw in batch if
- raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name')
+ self.result_to_platform_row(raw)
+ for raw in batch
+ if raw.get("request", {}).get("ingest_type") == "dataset"
+ and raw.get("platform_name")
]
fileset_platform_batch: List[Dict] = [p for p in fileset_platform_batch_all if p]
if fileset_platform_batch:
- resp = self.db.insert_ingest_fileset_platform(self.cur,
- fileset_platform_batch,
- on_conflict="update")
- self.counts['insert-fileset_platform'] += resp[0]
- self.counts['update-fileset_platform'] += resp[1]
+ resp = self.db.insert_ingest_fileset_platform(
+ self.cur, fileset_platform_batch, on_conflict="update"
+ )
+ self.counts["insert-fileset_platform"] += resp[0]
+ self.counts["update-fileset_platform"] += resp[1]
self.db.commit()
return []
@@ -307,7 +326,7 @@ class PersistIngestRequestWorker(PersistIngestFileResultWorker):
raise NotImplementedError
def push_batch(self, batch: list) -> list:
- self.counts['total'] += len(batch)
+ self.counts["total"] += len(batch)
if not batch:
return []
@@ -317,8 +336,8 @@ class PersistIngestRequestWorker(PersistIngestFileResultWorker):
if irequests:
resp = self.db.insert_ingest_request(self.cur, irequests)
- self.counts['insert-requests'] += resp[0]
- self.counts['update-requests'] += resp[1]
+ self.counts["insert-requests"] += resp[0]
+ self.counts["update-requests"] += resp[1]
self.db.commit()
return []
@@ -329,13 +348,13 @@ class PersistGrobidWorker(SandcrawlerWorker):
super().__init__()
self.grobid = GrobidClient()
self.s3 = SandcrawlerMinioClient(
- host_url=kwargs.get('s3_url', 'localhost:9000'),
- access_key=kwargs['s3_access_key'],
- secret_key=kwargs['s3_secret_key'],
- default_bucket=kwargs['s3_bucket'],
+ host_url=kwargs.get("s3_url", "localhost:9000"),
+ access_key=kwargs["s3_access_key"],
+ secret_key=kwargs["s3_secret_key"],
+ default_bucket=kwargs["s3_bucket"],
)
- self.s3_only = kwargs.get('s3_only', False)
- self.db_only = kwargs.get('db_only', False)
+ self.s3_only = kwargs.get("s3_only", False)
+ self.db_only = kwargs.get("db_only", False)
assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed"
if not self.s3_only:
self.db: Optional[SandcrawlerPostgresClient] = SandcrawlerPostgresClient(db_url)
@@ -349,58 +368,58 @@ class PersistGrobidWorker(SandcrawlerWorker):
raise NotImplementedError
def push_batch(self, batch: list) -> list:
- self.counts['total'] += len(batch)
+ self.counts["total"] += len(batch)
# filter out bad "missing status_code" timeout rows
- missing = [r for r in batch if not r.get('status_code')]
+ missing = [r for r in batch if not r.get("status_code")]
if missing:
- self.counts['skip-missing-status'] += len(missing)
- batch = [r for r in batch if r.get('status_code')]
+ self.counts["skip-missing-status"] += len(missing)
+ batch = [r for r in batch if r.get("status_code")]
for r in batch:
- if r['status_code'] != 200 or not r.get('tei_xml'):
- self.counts['s3-skip-status'] += 1
- if r.get('error_msg'):
- r['metadata'] = {'error_msg': r['error_msg'][:500]}
+ if r["status_code"] != 200 or not r.get("tei_xml"):
+ self.counts["s3-skip-status"] += 1
+ if r.get("error_msg"):
+ r["metadata"] = {"error_msg": r["error_msg"][:500]}
continue
- assert len(r['key']) == 40
+ assert len(r["key"]) == 40
if not self.db_only:
self.s3.put_blob(
folder="grobid",
- blob=r['tei_xml'],
- sha1hex=r['key'],
+ blob=r["tei_xml"],
+ sha1hex=r["key"],
extension=".tei.xml",
)
- self.counts['s3-put'] += 1
+ self.counts["s3-put"] += 1
# enhance with teixml2json metadata, if available
try:
metadata = self.grobid.metadata(r)
except xml.etree.ElementTree.ParseError as xml_e:
- r['status'] = 'bad-grobid-xml'
- r['metadata'] = {'error_msg': str(xml_e)[:1024]}
+ r["status"] = "bad-grobid-xml"
+ r["metadata"] = {"error_msg": str(xml_e)[:1024]}
continue
if not metadata:
continue
- for k in ('fatcat_release', 'grobid_version'):
+ for k in ("fatcat_release", "grobid_version"):
r[k] = metadata.pop(k, None)
- if r.get('fatcat_release'):
- r['fatcat_release'] = r['fatcat_release'].replace('release_', '')
- if metadata.get('grobid_timestamp'):
- r['updated'] = metadata['grobid_timestamp']
- r['metadata'] = metadata
+ if r.get("fatcat_release"):
+ r["fatcat_release"] = r["fatcat_release"].replace("release_", "")
+ if metadata.get("grobid_timestamp"):
+ r["updated"] = metadata["grobid_timestamp"]
+ r["metadata"] = metadata
if not self.s3_only:
assert self.db and self.cur
resp = self.db.insert_grobid(self.cur, batch, on_conflict="update")
- self.counts['insert-grobid'] += resp[0]
- self.counts['update-grobid'] += resp[1]
+ self.counts["insert-grobid"] += resp[0]
+ self.counts["update-grobid"] += resp[1]
- file_meta_batch = [r['file_meta'] for r in batch if r.get('file_meta')]
+ file_meta_batch = [r["file_meta"] for r in batch if r.get("file_meta")]
resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update")
- self.counts['insert-file-meta'] += resp[0]
- self.counts['update-file-meta'] += resp[1]
+ self.counts["insert-file-meta"] += resp[0]
+ self.counts["update-file-meta"] += resp[1]
self.db.commit()
@@ -413,6 +432,7 @@ class PersistGrobidDiskWorker(SandcrawlerWorker):
This could be refactored into a "Sink" type with an even thinner wrapper.
"""
+
def __init__(self, output_dir: str):
super().__init__()
self.output_dir = output_dir
@@ -428,14 +448,14 @@ class PersistGrobidDiskWorker(SandcrawlerWorker):
def process(self, record: Any, key: Optional[str] = None) -> Any:
- if record.get('status_code') != 200 or not record.get('tei_xml'):
+ if record.get("status_code") != 200 or not record.get("tei_xml"):
return False
- assert (len(record['key'])) == 40
- p = "{}/{}".format(self.output_dir, self._blob_path(record['key']))
+ assert (len(record["key"])) == 40
+ p = "{}/{}".format(self.output_dir, self._blob_path(record["key"]))
os.makedirs(os.path.dirname(p), exist_ok=True)
- with open(p, 'w') as f:
- f.write(record.pop('tei_xml'))
- self.counts['written'] += 1
+ with open(p, "w") as f:
+ f.write(record.pop("tei_xml"))
+ self.counts["written"] += 1
return record
@@ -450,24 +470,25 @@ class PersistPdfTrioWorker(SandcrawlerWorker):
raise NotImplementedError
def push_batch(self, batch: list) -> list:
- self.counts['total'] += len(batch)
+ self.counts["total"] += len(batch)
- batch = [r for r in batch if 'pdf_trio' in r and r['pdf_trio'].get('status_code')]
+ batch = [r for r in batch if "pdf_trio" in r and r["pdf_trio"].get("status_code")]
for r in batch:
# copy key (sha1hex) into sub-object
- r['pdf_trio']['key'] = r['key']
- pdftrio_batch = [r['pdf_trio'] for r in batch]
+ r["pdf_trio"]["key"] = r["key"]
+ pdftrio_batch = [r["pdf_trio"] for r in batch]
resp = self.db.insert_pdftrio(self.cur, pdftrio_batch, on_conflict="update")
- self.counts['insert-pdftrio'] += resp[0]
- self.counts['update-pdftrio'] += resp[1]
+ self.counts["insert-pdftrio"] += resp[0]
+ self.counts["update-pdftrio"] += resp[1]
file_meta_batch = [
- r['file_meta'] for r in batch
- if r['pdf_trio']['status'] == "success" and r.get('file_meta')
+ r["file_meta"]
+ for r in batch
+ if r["pdf_trio"]["status"] == "success" and r.get("file_meta")
]
resp = self.db.insert_file_meta(self.cur, file_meta_batch)
- self.counts['insert-file-meta'] += resp[0]
- self.counts['update-file-meta'] += resp[1]
+ self.counts["insert-file-meta"] += resp[0]
+ self.counts["update-file-meta"] += resp[1]
self.db.commit()
return []
@@ -479,16 +500,17 @@ class PersistPdfTextWorker(SandcrawlerWorker):
Should keep batch sizes small.
"""
+
def __init__(self, db_url: str, **kwargs):
super().__init__()
self.s3 = SandcrawlerMinioClient(
- host_url=kwargs.get('s3_url', 'localhost:9000'),
- access_key=kwargs['s3_access_key'],
- secret_key=kwargs['s3_secret_key'],
- default_bucket=kwargs['s3_bucket'],
+ host_url=kwargs.get("s3_url", "localhost:9000"),
+ access_key=kwargs["s3_access_key"],
+ secret_key=kwargs["s3_secret_key"],
+ default_bucket=kwargs["s3_bucket"],
)
- self.s3_only = kwargs.get('s3_only', False)
- self.db_only = kwargs.get('db_only', False)
+ self.s3_only = kwargs.get("s3_only", False)
+ self.db_only = kwargs.get("db_only", False)
assert not (self.s3_only and self.db_only), "Only one of s3_only and db_only allowed"
if not self.s3_only:
self.db: Optional[SandcrawlerPostgresClient] = SandcrawlerPostgresClient(db_url)
@@ -502,17 +524,17 @@ class PersistPdfTextWorker(SandcrawlerWorker):
raise NotImplementedError
def push_batch(self, batch: list) -> list:
- self.counts['total'] += len(batch)
+ self.counts["total"] += len(batch)
parsed_batch = []
for r in batch:
parsed_batch.append(PdfExtractResult.from_pdftext_dict(r))
for r in parsed_batch:
- if r.status != 'success' or not r.text:
- self.counts['s3-skip-status'] += 1
+ if r.status != "success" or not r.text:
+ self.counts["s3-skip-status"] += 1
if r.error_msg:
- r.metadata = {'error_msg': r.error_msg[:500]}
+ r.metadata = {"error_msg": r.error_msg[:500]}
continue
assert len(r.sha1hex) == 40
@@ -523,19 +545,19 @@ class PersistPdfTextWorker(SandcrawlerWorker):
sha1hex=r.sha1hex,
extension=".txt",
)
- self.counts['s3-put'] += 1
+ self.counts["s3-put"] += 1
if not self.s3_only:
assert self.db and self.cur
rows = [r.to_sql_tuple() for r in parsed_batch]
resp = self.db.insert_pdf_meta(self.cur, rows, on_conflict="update")
- self.counts['insert-pdf-meta'] += resp[0]
- self.counts['update-pdf-meta'] += resp[1]
+ self.counts["insert-pdf-meta"] += resp[0]
+ self.counts["update-pdf-meta"] += resp[1]
file_meta_batch = [r.file_meta for r in parsed_batch if r.file_meta]
resp = self.db.insert_file_meta(self.cur, file_meta_batch, on_conflict="update")
- self.counts['insert-file-meta'] += resp[0]
- self.counts['update-file-meta'] += resp[1]
+ self.counts["insert-file-meta"] += resp[0]
+ self.counts["update-file-meta"] += resp[1]
self.db.commit()
@@ -550,16 +572,17 @@ class PersistThumbnailWorker(SandcrawlerWorker):
This worker *must* be used with raw kakfa mode; thumbnails are *not*
wrapped in JSON like most sandcrawler kafka messages.
"""
+
def __init__(self, **kwargs):
super().__init__()
self.s3 = SandcrawlerMinioClient(
- host_url=kwargs.get('s3_url', 'localhost:9000'),
- access_key=kwargs['s3_access_key'],
- secret_key=kwargs['s3_secret_key'],
- default_bucket=kwargs['s3_bucket'],
+ host_url=kwargs.get("s3_url", "localhost:9000"),
+ access_key=kwargs["s3_access_key"],
+ secret_key=kwargs["s3_secret_key"],
+ default_bucket=kwargs["s3_bucket"],
)
- self.s3_extension = kwargs.get('s3_extension', ".jpg")
- self.s3_folder = kwargs.get('s3_folder', "pdf")
+ self.s3_extension = kwargs.get("s3_extension", ".jpg")
+ self.s3_folder = kwargs.get("s3_folder", "pdf")
def process(self, record: Any, key: Optional[str] = None) -> Any:
"""
@@ -569,7 +592,7 @@ class PersistThumbnailWorker(SandcrawlerWorker):
assert isinstance(record, bytes)
blob: bytes = record
if isinstance(key, bytes):
- key = key.decode('utf-8')
+ key = key.decode("utf-8")
assert key is not None and len(key) == 40 and isinstance(key, str)
assert len(blob) >= 50
@@ -579,7 +602,7 @@ class PersistThumbnailWorker(SandcrawlerWorker):
sha1hex=key,
extension=self.s3_extension,
)
- self.counts['s3-put'] += 1
+ self.counts["s3-put"] += 1
class GenericPersistDocWorker(SandcrawlerWorker):
@@ -588,39 +611,40 @@ class GenericPersistDocWorker(SandcrawlerWorker):
Objects are assumed to be JSON-wrapped strings.
"""
+
def __init__(self, **kwargs):
super().__init__()
self.s3 = SandcrawlerMinioClient(
- host_url=kwargs.get('s3_url', 'localhost:9000'),
- access_key=kwargs['s3_access_key'],
- secret_key=kwargs['s3_secret_key'],
- default_bucket=kwargs['s3_bucket'],
+ host_url=kwargs.get("s3_url", "localhost:9000"),
+ access_key=kwargs["s3_access_key"],
+ secret_key=kwargs["s3_secret_key"],
+ default_bucket=kwargs["s3_bucket"],
)
- self.s3_extension = kwargs.get('s3_extension', ".unknown")
- self.s3_folder = kwargs.get('s3_folder', "unknown")
+ self.s3_extension = kwargs.get("s3_extension", ".unknown")
+ self.s3_folder = kwargs.get("s3_folder", "unknown")
self.doc_key = "unknown"
def process(self, record: Any, key: Optional[str] = None) -> Any:
- if record.get('status') != 'success' or not record.get(self.doc_key):
+ if record.get("status") != "success" or not record.get(self.doc_key):
return
assert key is not None
if isinstance(key, bytes):
- key_str = key.decode('utf-8')
+ key_str = key.decode("utf-8")
elif isinstance(key, str):
key_str = key
assert len(key_str) == 40
- if 'sha1hex' in record:
- assert key_str == record['sha1hex']
+ if "sha1hex" in record:
+ assert key_str == record["sha1hex"]
self.s3.put_blob(
folder=self.s3_folder,
- blob=record[self.doc_key].encode('utf-8'),
+ blob=record[self.doc_key].encode("utf-8"),
sha1hex=key_str,
extension=self.s3_extension,
)
- self.counts['s3-put'] += 1
+ self.counts["s3-put"] += 1
class PersistXmlDocWorker(GenericPersistDocWorker):
@@ -628,10 +652,11 @@ class PersistXmlDocWorker(GenericPersistDocWorker):
Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to
sandcrawler database (SQL).
"""
+
def __init__(self, **kwargs):
super().__init__(**kwargs)
- self.s3_extension = kwargs.get('s3_extension', ".jats.xml")
- self.s3_folder = kwargs.get('s3_folder', "xml_doc")
+ self.s3_extension = kwargs.get("s3_extension", ".jats.xml")
+ self.s3_folder = kwargs.get("s3_folder", "xml_doc")
self.doc_key = "jats_xml"
@@ -640,8 +665,9 @@ class PersistHtmlTeiXmlWorker(GenericPersistDocWorker):
Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to
sandcrawler database (SQL).
"""
+
def __init__(self, **kwargs):
super().__init__(**kwargs)
- self.s3_extension = kwargs.get('s3_extension', ".tei.xml")
- self.s3_folder = kwargs.get('s3_folder', "html_body")
+ self.s3_extension = kwargs.get("s3_extension", ".tei.xml")
+ self.s3_folder = kwargs.get("s3_folder", "html_body")
self.doc_key = "tei_xml"
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index ceb6671..bd7f36a 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -10,8 +10,13 @@ from typing import Any, Dict, List, Optional, Sequence
import requests
from confluent_kafka import Consumer, KafkaException, Producer
-from .ia import (PetaboxError, SandcrawlerBackoffError, WaybackClient, WaybackContentError,
- WaybackError)
+from .ia import (
+ PetaboxError,
+ SandcrawlerBackoffError,
+ WaybackClient,
+ WaybackContentError,
+ WaybackError,
+)
from .misc import parse_cdx_line
@@ -22,25 +27,26 @@ class SandcrawlerWorker(object):
Usually these get "pushed" into by a RecordPusher. Output goes to another
worker (pipeline-style), or defaults to stdout.
"""
- def __init__(self, sink: Optional['SandcrawlerWorker'] = None):
+
+ def __init__(self, sink: Optional["SandcrawlerWorker"] = None):
self.counts: Counter = Counter()
self.sink: Optional[SandcrawlerWorker] = sink
def push_record(self, task: Any, key: Optional[str] = None) -> Any:
- self.counts['total'] += 1
+ self.counts["total"] += 1
if not self.want(task):
- self.counts['skip'] += 1
+ self.counts["skip"] += 1
return
result = self.process(task, key=key)
if not result:
- self.counts['failed'] += 1
+ self.counts["failed"] += 1
return
- elif type(result) == dict and 'status' in result and len(result['status']) < 32:
- self.counts[result['status']] += 1
+ elif type(result) == dict and "status" in result and len(result["status"]) < 32:
+ self.counts[result["status"]] += 1
if self.sink:
self.sink.push_record(result)
- self.counts['pushed'] += 1
+ self.counts["pushed"] += 1
else:
print(json.dumps(result))
return result
@@ -53,10 +59,9 @@ class SandcrawlerWorker(object):
"""
return None
- def push_record_timeout(self,
- task: Any,
- key: Optional[str] = None,
- timeout: int = 300) -> Any:
+ def push_record_timeout(
+ self, task: Any, key: Optional[str] = None, timeout: int = 300
+ ) -> Any:
"""
A wrapper around self.push_record which sets a timeout.
@@ -64,6 +69,7 @@ class SandcrawlerWorker(object):
multithreading or if signal-based timeouts are used elsewhere in the
same process.
"""
+
def timeout_handler(signum: int, frame: Any) -> None:
raise TimeoutError("timeout processing record")
@@ -73,12 +79,12 @@ class SandcrawlerWorker(object):
try:
resp = self.push_record(task, key=key)
except TimeoutError:
- self.counts['timeout'] += 1
+ self.counts["timeout"] += 1
resp = self.timeout_response(task) # pylint: disable=assignment-from-none
# TODO: what if it is this push_record() itself that is timing out?
if resp and self.sink:
self.sink.push_record(resp)
- self.counts['pushed'] += 1
+ self.counts["pushed"] += 1
elif resp:
print(json.dumps(resp))
finally:
@@ -109,7 +115,7 @@ class SandcrawlerWorker(object):
TODO: should derived workers explicitly type-check the 'task' object?
"""
- raise NotImplementedError('implementation required')
+ raise NotImplementedError("implementation required")
class SandcrawlerFetchWorker(SandcrawlerWorker):
@@ -117,25 +123,26 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg,
PDFs) from wayback, archive.org, or other sources.
"""
+
def __init__(self, wayback_client: Optional[WaybackClient], **kwargs):
super().__init__(**kwargs)
self.wayback_client = wayback_client
def fetch_blob(self, record: Dict[str, Any]) -> Dict[str, Any]:
- default_key = record['sha1hex']
+ default_key = record["sha1hex"]
wayback_sec = None
petabox_sec = None
- if record.get('warc_path') and record.get('warc_offset'):
+ if record.get("warc_path") and record.get("warc_offset"):
# it's a full CDX dict. fetch using WaybackClient
if not self.wayback_client:
raise Exception("wayback client not configured for this SandcrawlerFetchWorker")
try:
start = time.time()
blob: bytes = self.wayback_client.fetch_petabox_body(
- csize=record['warc_csize'],
- offset=record['warc_offset'],
- warc_path=record['warc_path'],
+ csize=record["warc_csize"],
+ offset=record["warc_offset"],
+ warc_path=record["warc_path"],
)
wayback_sec = time.time() - start
except (WaybackError, WaybackContentError, PetaboxError, KeyError) as we:
@@ -145,15 +152,15 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
status="error-wayback",
error_msg=str(we),
)
- elif record.get('url') and record.get('datetime'):
+ elif record.get("url") and record.get("datetime"):
# it's a partial CDX dict or something? fetch using WaybackClient
if not self.wayback_client:
raise Exception("wayback client not configured for this SandcrawlerFetchWorker")
try:
start = time.time()
blob = self.wayback_client.fetch_replay_body(
- url=record['url'],
- datetime=record['datetime'],
+ url=record["url"],
+ datetime=record["datetime"],
)
wayback_sec = time.time() - start
except (WaybackError, WaybackContentError) as we:
@@ -163,11 +170,12 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
status="error-wayback",
error_msg=str(we),
)
- elif record.get('item') and record.get('path'):
+ elif record.get("item") and record.get("path"):
# it's petabox link; fetch via HTTP
start = time.time()
- ia_resp = requests.get("https://archive.org/serve/{}/{}".format(
- record['item'], record['path']))
+ ia_resp = requests.get(
+ "https://archive.org/serve/{}/{}".format(record["item"], record["path"])
+ )
petabox_sec = time.time() - start
try:
ia_resp.raise_for_status()
@@ -181,7 +189,8 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
blob = ia_resp.content
else:
raise ValueError(
- "not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
+ "not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed"
+ )
if not blob:
return dict(
key=default_key,
@@ -201,29 +210,31 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
class MultiprocessWrapper(SandcrawlerWorker):
- def __init__(self,
- worker: SandcrawlerWorker,
- sink: Optional[SandcrawlerWorker] = None,
- jobs: Optional[int] = None):
+ def __init__(
+ self,
+ worker: SandcrawlerWorker,
+ sink: Optional[SandcrawlerWorker] = None,
+ jobs: Optional[int] = None,
+ ):
self.counts = Counter()
self.worker = worker
self.sink = sink
self.pool = multiprocessing.pool.Pool(jobs)
def push_batch(self, tasks: List[Any]) -> List[Any]:
- self.counts['total'] += len(tasks)
+ self.counts["total"] += len(tasks)
print("... processing batch of: {}".format(len(tasks)), file=sys.stderr)
results = self.pool.map(self.worker.process, tasks)
for result in results:
if not result:
- self.counts['failed'] += 1
+ self.counts["failed"] += 1
return []
- elif type(result) == dict and 'status' in result and len(result['status']) < 32:
- self.counts[result['status']] += 1
+ elif type(result) == dict and "status" in result and len(result["status"]) < 32:
+ self.counts[result["status"]] += 1
if self.sink:
self.sink.push_record(result)
- self.counts['pushed'] += 1
+ self.counts["pushed"] += 1
else:
print(json.dumps(result))
return results
@@ -243,6 +254,7 @@ class BlackholeSink(SandcrawlerWorker):
Useful for tests.
"""
+
def push_record(self, task: Any, key: Optional[str] = None) -> Any:
return
@@ -257,12 +269,14 @@ class KafkaSink(SandcrawlerWorker):
self.produce_topic = produce_topic
self.kafka_hosts = kafka_hosts
- config = self.producer_config({
- 'bootstrap.servers': kafka_hosts,
- 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes
- 'api.version.request': True,
- 'api.version.fallback.ms': 0,
- })
+ config = self.producer_config(
+ {
+ "bootstrap.servers": kafka_hosts,
+ "message.max.bytes": 30000000, # ~30 MBytes; broker is ~50 MBytes
+ "api.version.request": True,
+ "api.version.fallback.ms": 0,
+ }
+ )
self.producer = Producer(config)
@staticmethod
@@ -275,27 +289,29 @@ class KafkaSink(SandcrawlerWorker):
def producer_config(self, kafka_config: dict) -> dict:
config = kafka_config.copy()
- config.update({
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'message.timeout.ms': 30000,
- 'request.required.acks': -1, # all brokers must confirm
+ config.update(
+ {
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "message.timeout.ms": 30000,
+ "request.required.acks": -1, # all brokers must confirm
+ },
}
- })
+ )
return config
def push_record(self, msg: Any, key: Optional[str] = None) -> Any:
- self.counts['total'] += 1
+ self.counts["total"] += 1
if type(msg) == dict:
- if not key and 'key' in msg:
- key = msg['key']
+ if not key and "key" in msg:
+ key = msg["key"]
msg = json.dumps(msg)
if type(msg) == str:
- msg = msg.encode('utf-8')
+ msg = msg.encode("utf-8")
assert type(msg) == bytes
self.producer.produce(self.produce_topic, msg, key=key, on_delivery=self._fail_fast)
- self.counts['produced'] += 1
+ self.counts["produced"] += 1
# check for errors etc
self.producer.poll(0)
@@ -314,19 +330,22 @@ class KafkaCompressSink(KafkaSink):
"""
Variant of KafkaSink for large documents. Used for, eg, GROBID output.
"""
+
def producer_config(self, kafka_config: Dict[str, Any]) -> Dict[str, Any]:
config = kafka_config.copy()
- config.update({
- 'compression.codec': 'gzip',
- 'retry.backoff.ms': 250,
- 'linger.ms': 1000,
- 'batch.num.messages': 50,
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'message.timeout.ms': 30000,
- 'request.required.acks': -1, # all brokers must confirm
+ config.update(
+ {
+ "compression.codec": "gzip",
+ "retry.backoff.ms": 250,
+ "linger.ms": 1000,
+ "batch.num.messages": 50,
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "message.timeout.ms": 30000,
+ "request.required.acks": -1, # all brokers must confirm
+ },
}
- })
+ )
return config
@@ -335,6 +354,7 @@ class RecordPusher:
Base class for different record sources to be pushed into workers. Pretty
trivial interface, just wraps an importer and pushes records in to it.
"""
+
def __init__(self, worker: SandcrawlerWorker, **kwargs):
self.counts: Counter = Counter()
self.worker: SandcrawlerWorker = worker
@@ -356,7 +376,7 @@ class JsonLinePusher(RecordPusher):
self.counts = Counter()
self.worker = worker
self.json_file = json_file
- self.batch_size = kwargs.get('batch_size', None)
+ self.batch_size = kwargs.get("batch_size", None)
if self.batch_size in (0, 1):
self.batch_size = None
@@ -365,24 +385,24 @@ class JsonLinePusher(RecordPusher):
for line in self.json_file:
if not line:
continue
- self.counts['total'] += 1
+ self.counts["total"] += 1
try:
record = json.loads(line)
except json.decoder.JSONDecodeError:
- self.counts['error-json-decode'] += 1
+ self.counts["error-json-decode"] += 1
continue
if self.batch_size:
batch.append(record)
if len(batch) >= self.batch_size:
self.worker.push_batch(batch)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
batch = []
else:
self.worker.push_record(record)
- self.counts['pushed'] += 1
+ self.counts["pushed"] += 1
if self.batch_size and batch:
self.worker.push_batch(batch)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
batch = []
self.worker.finish()
print("JSON lines pushed: {}".format(self.counts), file=sys.stderr)
@@ -394,10 +414,10 @@ class CdxLinePusher(RecordPusher):
self.counts = Counter()
self.worker = worker
self.cdx_file = cdx_file
- self.filter_http_statuses = kwargs.get('filter_http_statuses', None)
- self.filter_mimetypes = kwargs.get('filter_mimetypes', None)
- self.allow_octet_stream = kwargs.get('allow_octet_stream', False)
- self.batch_size = kwargs.get('batch_size', None)
+ self.filter_http_statuses = kwargs.get("filter_http_statuses", None)
+ self.filter_mimetypes = kwargs.get("filter_mimetypes", None)
+ self.allow_octet_stream = kwargs.get("allow_octet_stream", False)
+ self.batch_size = kwargs.get("batch_size", None)
if self.batch_size in (0, 1):
self.batch_size = None
@@ -406,30 +426,32 @@ class CdxLinePusher(RecordPusher):
for line in self.cdx_file:
if not line:
continue
- self.counts['total'] += 1
+ self.counts["total"] += 1
record = parse_cdx_line(line, normalize=True)
if not record:
- self.counts['skip-parse'] += 1
+ self.counts["skip-parse"] += 1
continue
- if self.filter_http_statuses and record[
- 'http_status'] not in self.filter_http_statuses:
- self.counts['skip-http_status'] += 1
+ if (
+ self.filter_http_statuses
+ and record["http_status"] not in self.filter_http_statuses
+ ):
+ self.counts["skip-http_status"] += 1
continue
- if self.filter_mimetypes and record['mimetype'] not in self.filter_mimetypes:
- self.counts['skip-mimetype'] += 1
+ if self.filter_mimetypes and record["mimetype"] not in self.filter_mimetypes:
+ self.counts["skip-mimetype"] += 1
continue
if self.batch_size:
batch.append(record)
if len(batch) >= self.batch_size:
self.worker.push_batch(batch)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
batch = []
else:
self.worker.push_record(record)
- self.counts['pushed'] += 1
+ self.counts["pushed"] += 1
if self.batch_size and batch:
self.worker.push_batch(batch)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
batch = []
self.worker.finish()
print("CDX lines pushed: {}".format(self.counts), file=sys.stderr)
@@ -442,33 +464,33 @@ class ZipfilePusher(RecordPusher):
self.worker = worker
self.filter_suffix = ".pdf"
self.zipfile_path = zipfile_path
- self.batch_size = kwargs.get('batch_size', None)
+ self.batch_size = kwargs.get("batch_size", None)
if self.batch_size in (0, 1):
self.batch_size = None
def run(self) -> Counter:
batch = []
- with zipfile.ZipFile(self.zipfile_path, 'r') as archive:
+ with zipfile.ZipFile(self.zipfile_path, "r") as archive:
for zipinfo in archive.infolist():
if not zipinfo.filename.endswith(self.filter_suffix):
continue
- self.counts['total'] += 1
+ self.counts["total"] += 1
# NB doesn't really extract the file, just gives you a stream (file-like-object) for reading it
- flo = archive.open(zipinfo, 'r')
- data = flo.read(2**32)
+ flo = archive.open(zipinfo, "r")
+ data = flo.read(2 ** 32)
flo.close()
if self.batch_size:
batch.append(data)
if len(batch) >= self.batch_size:
self.worker.push_batch(batch)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
batch = []
else:
self.worker.push_record(data)
- self.counts['pushed'] += 1
+ self.counts["pushed"] += 1
if self.batch_size and batch:
self.worker.push_batch(batch)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
batch = []
self.worker.finish()
print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
@@ -476,8 +498,14 @@ class ZipfilePusher(RecordPusher):
class KafkaJsonPusher(RecordPusher):
- def __init__(self, worker: SandcrawlerWorker, kafka_hosts: str, consume_topic: str,
- group: str, **kwargs):
+ def __init__(
+ self,
+ worker: SandcrawlerWorker,
+ kafka_hosts: str,
+ consume_topic: str,
+ group: str,
+ **kwargs
+ ):
self.counts = Counter()
self.worker = worker
self.consumer = make_kafka_consumer(
@@ -485,14 +513,14 @@ class KafkaJsonPusher(RecordPusher):
consume_topic,
group,
)
- self.push_batches = kwargs.get('push_batches', False)
- self.raw_records = kwargs.get('raw_records', False)
- self.poll_interval = kwargs.get('poll_interval', 5.0)
- self.batch_size = kwargs.get('batch_size', 100)
+ self.push_batches = kwargs.get("push_batches", False)
+ self.raw_records = kwargs.get("raw_records", False)
+ self.poll_interval = kwargs.get("poll_interval", 5.0)
+ self.batch_size = kwargs.get("batch_size", 100)
if self.batch_size in (0, 1):
self.batch_size = 1
- self.batch_worker = kwargs.get('batch_worker', False)
- self.process_timeout_sec = kwargs.get('process_timeout_sec', 300)
+ self.batch_worker = kwargs.get("batch_worker", False)
+ self.process_timeout_sec = kwargs.get("process_timeout_sec", 300)
def run(self) -> Counter:
while True:
@@ -502,11 +530,15 @@ class KafkaJsonPusher(RecordPusher):
# case where there there is one update and thousands of creates;
# update would be lingering in worker, and if worker crashed
# never created. Not great.
- batch = self.consumer.consume(num_messages=self.batch_size,
- timeout=self.poll_interval)
- print("... got {} kafka messages ({}sec poll interval)".format(
- len(batch), self.poll_interval),
- file=sys.stderr)
+ batch = self.consumer.consume(
+ num_messages=self.batch_size, timeout=self.poll_interval
+ )
+ print(
+ "... got {} kafka messages ({}sec poll interval)".format(
+ len(batch), self.poll_interval
+ ),
+ file=sys.stderr,
+ )
if not batch:
# TODO: could have some larger timeout here and
# self.worker.finish() if it's been more than, eg, a couple
@@ -518,14 +550,14 @@ class KafkaJsonPusher(RecordPusher):
raise KafkaException(msg.error())
# ... then process
if self.push_batches:
- self.counts['total'] += len(batch)
- records = [json.loads(msg.value().decode('utf-8')) for msg in batch]
+ self.counts["total"] += len(batch)
+ records = [json.loads(msg.value().decode("utf-8")) for msg in batch]
self.worker.push_batch(records)
- self.counts['pushed'] += len(batch)
+ self.counts["pushed"] += len(batch)
print("Import counts: {}".format(self.worker.counts), file=sys.stderr)
else:
for msg in batch:
- self.counts['total'] += 1
+ self.counts["total"] += 1
if self.raw_records:
# In this mode, pass the Kafka message as bytes through
# without decoding as JSON. Eg, for thumbnails (where
@@ -533,7 +565,7 @@ class KafkaJsonPusher(RecordPusher):
# from the message)
record = msg.value()
else:
- record = json.loads(msg.value().decode('utf-8'))
+ record = json.loads(msg.value().decode("utf-8"))
# This complex bit of code implements backoff/backpressure
# in a way that will not cause this Kafka consumer to lose
# partition assignments (resulting in a rebalance). This
@@ -543,9 +575,9 @@ class KafkaJsonPusher(RecordPusher):
while not done:
try:
# use timeouts; don't want kafka itself to timeout
- self.worker.push_record_timeout(record,
- key=msg.key(),
- timeout=self.process_timeout_sec)
+ self.worker.push_record_timeout(
+ record, key=msg.key(), timeout=self.process_timeout_sec
+ )
break
except SandcrawlerBackoffError as be:
print("Backing off for 200 seconds: {}".format(be))
@@ -557,8 +589,8 @@ class KafkaJsonPusher(RecordPusher):
assert not empty_batch
time.sleep(5)
self.consumer.resume(self.consumer.assignment())
- self.counts['pushed'] += 1
- if self.counts['total'] % 500 == 0:
+ self.counts["pushed"] += 1
+ if self.counts["total"] % 500 == 0:
print("Import counts: {}".format(self.worker.counts), file=sys.stderr)
for msg in batch:
# locally store offsets of processed messages; will be
@@ -589,25 +621,25 @@ def make_kafka_consumer(hosts: str, consume_topic: str, group: str) -> Consumer:
print("Bailing out...", file=sys.stderr)
# TODO: should it be sys.exit(-1)?
raise KafkaException(p.error)
- #print("Kafka consumer commit successful")
+ # print("Kafka consumer commit successful")
pass
# previously, using pykafka
- #auto_commit_enable=True,
- #auto_commit_interval_ms=30000, # 30 seconds
+ # auto_commit_enable=True,
+ # auto_commit_interval_ms=30000, # 30 seconds
conf = {
- 'bootstrap.servers': hosts,
- 'group.id': group,
- 'on_commit': fail_fast,
+ "bootstrap.servers": hosts,
+ "group.id": group,
+ "on_commit": fail_fast,
# messages don't have offset marked as stored until processed,
# but we do auto-commit stored offsets to broker
- 'enable.auto.offset.store': False,
- 'enable.auto.commit': True,
+ "enable.auto.offset.store": False,
+ "enable.auto.commit": True,
# user code timeout; if no poll after this long, assume user code
# hung and rebalance (default: 6min)
- 'max.poll.interval.ms': 360000,
- 'default.topic.config': {
- 'auto.offset.reset': 'latest',
+ "max.poll.interval.ms": 360000,
+ "default.topic.config": {
+ "auto.offset.reset": "latest",
},
}
@@ -615,8 +647,9 @@ def make_kafka_consumer(hosts: str, consume_topic: str, group: str) -> Consumer:
for p in partitions:
if p.error:
raise KafkaException(p.error)
- print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions),
- file=sys.stderr)
+ print(
+ "Kafka partitions rebalanced: {} / {}".format(consumer, partitions), file=sys.stderr
+ )
consumer = Consumer(conf)
# NOTE: it's actually important that topic_name *not* be bytes (UTF-8