aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2021-10-26 12:54:37 -0700
committerBryan Newbold <bnewbold@archive.org>2021-10-26 12:54:37 -0700
commit05bd7cbcc62588e431c5efd533189e246b2a997e (patch)
treeabcc707a451e77ea1e8c5ac9a5925b97a4bd139a /python/sandcrawler
parentf3f424e42f2f4f383103cf80b30a00cfa6cfc179 (diff)
downloadsandcrawler-05bd7cbcc62588e431c5efd533189e246b2a997e.tar.gz
sandcrawler-05bd7cbcc62588e431c5efd533189e246b2a997e.zip
make fmt
Diffstat (limited to 'python/sandcrawler')
-rw-r--r--python/sandcrawler/__init__.py18
-rw-r--r--python/sandcrawler/db.py150
-rw-r--r--python/sandcrawler/fileset_platforms.py169
-rw-r--r--python/sandcrawler/fileset_strategies.py68
-rw-r--r--python/sandcrawler/fileset_types.py6
-rw-r--r--python/sandcrawler/grobid.py30
-rw-r--r--python/sandcrawler/html.py49
-rw-r--r--python/sandcrawler/html_metadata.py37
-rw-r--r--python/sandcrawler/ia.py192
-rw-r--r--python/sandcrawler/ingest_file.py145
-rw-r--r--python/sandcrawler/ingest_fileset.py101
-rw-r--r--python/sandcrawler/ingest_html.py120
-rw-r--r--python/sandcrawler/minio.py2
-rw-r--r--python/sandcrawler/misc.py64
-rw-r--r--python/sandcrawler/pdfextract.py29
-rw-r--r--python/sandcrawler/pdftrio.py10
-rw-r--r--python/sandcrawler/persist.py61
-rw-r--r--python/sandcrawler/workers.py60
-rw-r--r--python/sandcrawler/xml.py1
19 files changed, 741 insertions, 571 deletions
diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py
index bf2d92d..46735eb 100644
--- a/python/sandcrawler/__init__.py
+++ b/python/sandcrawler/__init__.py
@@ -1,14 +1,16 @@
-
from .db import SandcrawlerPostgresClient, SandcrawlerPostgrestClient
from .grobid import GrobidBlobWorker, GrobidClient, GrobidWorker
-from .ia import (CdxApiClient, CdxApiError, CdxPartial, CdxRow, PetaboxError, ResourceResult, SavePageNowClient,
- SavePageNowError, WarcResource, WaybackClient, WaybackContentError, WaybackError)
+from .ia import (CdxApiClient, CdxApiError, CdxPartial, CdxRow, PetaboxError, ResourceResult,
+ SavePageNowClient, SavePageNowError, WarcResource, WaybackClient,
+ WaybackContentError, WaybackError)
from .ingest_file import IngestFileWorker
from .ingest_fileset import IngestFilesetWorker
-from .misc import b32_hex, clean_url, gen_file_metadata, gen_file_metadata_path, parse_cdx_datetime, parse_cdx_line
+from .misc import (b32_hex, clean_url, gen_file_metadata, gen_file_metadata_path,
+ parse_cdx_datetime, parse_cdx_line)
from .pdfextract import PdfExtractBlobWorker, PdfExtractWorker
from .pdftrio import PdfTrioBlobWorker, PdfTrioClient, PdfTrioWorker
-from .persist import (PersistCdxWorker, PersistGrobidDiskWorker, PersistGrobidWorker, PersistIngestFileResultWorker,
- PersistIngestRequestWorker, PersistPdfTextWorker, PersistPdfTrioWorker, PersistThumbnailWorker)
-from .workers import (BlackholeSink, CdxLinePusher, JsonLinePusher, KafkaCompressSink, KafkaJsonPusher, KafkaSink,
- MultiprocessWrapper, ZipfilePusher)
+from .persist import (PersistCdxWorker, PersistGrobidDiskWorker, PersistGrobidWorker,
+ PersistIngestFileResultWorker, PersistIngestRequestWorker,
+ PersistPdfTextWorker, PersistPdfTrioWorker, PersistThumbnailWorker)
+from .workers import (BlackholeSink, CdxLinePusher, JsonLinePusher, KafkaCompressSink,
+ KafkaJsonPusher, KafkaSink, MultiprocessWrapper, ZipfilePusher)
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
index 4dcdb0e..360add9 100644
--- a/python/sandcrawler/db.py
+++ b/python/sandcrawler/db.py
@@ -1,4 +1,3 @@
-
import datetime
import json
from typing import Optional
@@ -9,17 +8,16 @@ import requests
class SandcrawlerPostgrestClient:
-
def __init__(self, api_url="http://wbgrp-svc506.us.archive.org:3030", **kwargs):
self.api_url = api_url
def get_cdx(self, url):
- resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.'+url))
+ resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.' + url))
resp.raise_for_status()
return resp.json() or None
def get_grobid(self, sha1):
- resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.'+sha1))
+ resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.' + sha1))
resp.raise_for_status()
resp = resp.json()
if resp:
@@ -28,7 +26,7 @@ class SandcrawlerPostgrestClient:
return None
def get_pdftrio(self, sha1):
- resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.'+sha1))
+ resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.' + sha1))
resp.raise_for_status()
resp = resp.json()
if resp:
@@ -37,7 +35,7 @@ class SandcrawlerPostgrestClient:
return None
def get_pdf_meta(self, sha1):
- resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.'+sha1))
+ resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.' + sha1))
resp.raise_for_status()
resp = resp.json()
if resp:
@@ -58,7 +56,7 @@ class SandcrawlerPostgrestClient:
return None
def get_file_meta(self, sha1):
- resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1))
+ resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.' + sha1))
resp.raise_for_status()
resp = resp.json()
if resp:
@@ -91,7 +89,7 @@ class SandcrawlerPostgrestClient:
return None
def get_crossref(self, doi):
- resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.'+doi))
+ resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.' + doi))
resp.raise_for_status()
resp = resp.json()
if resp:
@@ -99,8 +97,8 @@ class SandcrawlerPostgrestClient:
else:
return None
-class SandcrawlerPostgresClient:
+class SandcrawlerPostgresClient:
def __init__(self, db_url, **kwargs):
self.conn = psycopg2.connect(db_url)
@@ -135,14 +133,8 @@ class SandcrawlerPostgresClient:
batch = [d for d in batch if d.get('warc_path')]
if not batch:
return (0, 0)
- batch = [(d['url'],
- d['datetime'],
- d['sha1hex'],
- d['mimetype'],
- d['warc_path'],
- int(d['warc_csize']),
- int(d['warc_offset']))
- for d in batch]
+ batch = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'], d['warc_path'],
+ int(d['warc_csize']), int(d['warc_offset'])) for d in batch]
# filter out duplicate rows by key (url, datetime)
batch_dict = dict()
for b in batch:
@@ -170,12 +162,8 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [(d['sha1hex'],
- d['sha256hex'],
- d['md5hex'],
- int(d['size_bytes']),
- d['mimetype'])
- for d in batch]
+ batch = [(d['sha1hex'], d['sha256hex'], d['md5hex'], int(d['size_bytes']),
+ d['mimetype']) for d in batch]
# filter out duplicate rows by key (sha1hex)
batch_dict = dict()
for b in batch:
@@ -215,15 +203,15 @@ class SandcrawlerPostgresClient:
r[k] = r['metadata'].get(k)
r['metadata'].pop(k, None)
r['metadata'] = json.dumps(r['metadata'], sort_keys=True)
- batch = [(d['key'],
- d.get('grobid_version') or None,
- d['status_code'],
- d['status'],
- d.get('fatcat_release') or None,
- d.get('updated') or datetime.datetime.now(),
- d.get('metadata') or None ,
- )
- for d in batch]
+ batch = [(
+ d['key'],
+ d.get('grobid_version') or None,
+ d['status_code'],
+ d['status'],
+ d.get('fatcat_release') or None,
+ d.get('updated') or datetime.datetime.now(),
+ d.get('metadata') or None,
+ ) for d in batch]
# filter out duplicate rows by key (sha1hex)
batch_dict = dict()
for b in batch:
@@ -331,20 +319,18 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [
- (
- d['key'],
- d.get('updated') or datetime.datetime.now(),
- d['status_code'],
- d['status'],
- d.get('versions', {}).get('pdftrio_version') or None,
- d.get('versions', {}).get('models_date') or None,
- d.get('ensemble_score'),
- d.get('bert_score'),
- d.get('linear_score'),
- d.get('image_score'),
- )
- for d in batch]
+ batch = [(
+ d['key'],
+ d.get('updated') or datetime.datetime.now(),
+ d['status_code'],
+ d['status'],
+ d.get('versions', {}).get('pdftrio_version') or None,
+ d.get('versions', {}).get('models_date') or None,
+ d.get('ensemble_score'),
+ d.get('bert_score'),
+ d.get('linear_score'),
+ d.get('image_score'),
+ ) for d in batch]
# filter out duplicate rows by key (sha1hex)
batch_dict = dict()
for b in batch:
@@ -373,15 +359,15 @@ class SandcrawlerPostgresClient:
extra[k] = r[k]
if extra:
r['extra'] = json.dumps(extra, sort_keys=True)
- batch = [(d['link_source'],
- d['link_source_id'],
- d['ingest_type'],
- d['base_url'],
- d.get('ingest_request_source'),
- d.get('release_stage') or None,
- d.get('extra') or None,
- )
- for d in batch]
+ batch = [(
+ d['link_source'],
+ d['link_source_id'],
+ d['ingest_type'],
+ d['base_url'],
+ d.get('ingest_request_source'),
+ d.get('release_stage') or None,
+ d.get('extra') or None,
+ ) for d in batch]
# filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url)
batch_dict = dict()
for b in batch:
@@ -412,16 +398,16 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [(d['ingest_type'],
- d['base_url'],
- bool(d['hit']),
- d['status'],
- d.get('terminal_url'),
- d.get('terminal_dt'),
- d.get('terminal_status_code'),
- d.get('terminal_sha1hex'),
- )
- for d in batch]
+ batch = [(
+ d['ingest_type'],
+ d['base_url'],
+ bool(d['hit']),
+ d['status'],
+ d.get('terminal_url'),
+ d.get('terminal_dt'),
+ d.get('terminal_status_code'),
+ d.get('terminal_sha1hex'),
+ ) for d in batch]
# filter out duplicate rows by key (ingest_type, base_url)
batch_dict = dict()
for b in batch:
@@ -459,23 +445,23 @@ class SandcrawlerPostgresClient:
else:
raise NotImplementedError("on_conflict: {}".format(on_conflict))
sql += " RETURNING xmax;"
- batch = [(d['ingest_type'],
- d['base_url'],
- bool(d['hit']),
- d['status'],
- d.get('platform_name'),
- d.get('platform_domain'),
- d.get('platform_id'),
- d.get('ingest_strategy'),
- d.get('total_size'),
- d.get('file_count'),
- d.get('archiveorg_item_name'),
- d.get('archiveorg_item_bundle_path'),
- d.get('web_bundle_url'),
- d.get('web_bundle_dt'),
- d.get('manifest'),
- )
- for d in batch]
+ batch = [(
+ d['ingest_type'],
+ d['base_url'],
+ bool(d['hit']),
+ d['status'],
+ d.get('platform_name'),
+ d.get('platform_domain'),
+ d.get('platform_id'),
+ d.get('ingest_strategy'),
+ d.get('total_size'),
+ d.get('file_count'),
+ d.get('archiveorg_item_name'),
+ d.get('archiveorg_item_bundle_path'),
+ d.get('web_bundle_url'),
+ d.get('web_bundle_dt'),
+ d.get('manifest'),
+ ) for d in batch]
# filter out duplicate rows by key (ingest_type, base_url)
batch_dict = dict()
for b in batch:
diff --git a/python/sandcrawler/fileset_platforms.py b/python/sandcrawler/fileset_platforms.py
index 92fed37..f3441c9 100644
--- a/python/sandcrawler/fileset_platforms.py
+++ b/python/sandcrawler/fileset_platforms.py
@@ -1,4 +1,3 @@
-
import gzip
import json
import sys
@@ -16,17 +15,18 @@ from sandcrawler.ia import ResourceResult
class FilesetPlatformHelper():
-
def __init__(self):
self.platform_name = 'unknown'
- def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool:
+ def match_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> bool:
"""
Does this request look like it matches this platform?
"""
raise NotImplementedError()
- def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
+ def process_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
"""
@@ -40,19 +40,18 @@ class FilesetPlatformHelper():
# XXX: while developing ArchiveorgFileset path
#return IngestStrategy.ArchiveorgFileset
if len(item.manifest) == 1:
- if total_size < 64*1024*1024:
+ if total_size < 64 * 1024 * 1024:
return IngestStrategy.WebFile
else:
return IngestStrategy.ArchiveorgFile
else:
- if largest_size < 64*1024*1024 and total_size < 128*1024*1024*1024:
+ if largest_size < 64 * 1024 * 1024 and total_size < 128 * 1024 * 1024 * 1024:
return IngestStrategy.WebFileset
else:
return IngestStrategy.ArchiveorgFileset
class DataverseHelper(FilesetPlatformHelper):
-
def __init__(self):
super().__init__()
self.platform_name = 'dataverse'
@@ -122,8 +121,8 @@ class DataverseHelper(FilesetPlatformHelper):
"file_id": file_id,
}
-
- def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool:
+ def match_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> bool:
if resource and resource.terminal_url:
url = resource.terminal_url
else:
@@ -146,7 +145,8 @@ class DataverseHelper(FilesetPlatformHelper):
return True
- def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
+ def process_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
@@ -179,22 +179,27 @@ class DataverseHelper(FilesetPlatformHelper):
if parsed_id['file_id']:
# XXX: maybe we could support this?
- raise PlatformScopeError(f"only entire dataverse datasets can be archived with this tool")
+ raise PlatformScopeError(
+ f"only entire dataverse datasets can be archived with this tool")
# 1b. if we didn't get a version number from URL, fetch it from API
if not dataset_version:
- resp = self.session.get(f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}")
+ resp = self.session.get(
+ f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}"
+ )
resp.raise_for_status()
obj = resp.json()
obj_latest = obj['data']['latestVersion']
dataset_version = f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}"
# 2. API fetch
- resp = self.session.get(f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}&version={dataset_version}")
+ resp = self.session.get(
+ f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}&version={dataset_version}"
+ )
resp.raise_for_status()
obj = resp.json()
- obj_latest= obj['data']['latestVersion']
+ obj_latest = obj['data']['latestVersion']
assert dataset_version == f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}"
assert platform_id == obj_latest['datasetPersistentId']
@@ -212,15 +217,16 @@ class DataverseHelper(FilesetPlatformHelper):
extra['version'] = row['version']
if 'description' in df:
extra['description'] = df['description']
- manifest.append(FilesetManifestFile(
- path=df.get('originalFileName') or df['filename'],
- size=df.get('originalFileSize') or df['filesize'],
- md5=df['md5'],
- # NOTE: don't get: sha1, sha256
- mimetype=df['contentType'],
- platform_url=platform_url,
- extra=extra or None,
- ))
+ manifest.append(
+ FilesetManifestFile(
+ path=df.get('originalFileName') or df['filename'],
+ size=df.get('originalFileSize') or df['filesize'],
+ md5=df['md5'],
+ # NOTE: don't get: sha1, sha256
+ mimetype=df['contentType'],
+ platform_url=platform_url,
+ extra=extra or None,
+ ))
platform_sub_id = platform_id.split('/')[-1]
archiveorg_item_name = f"{platform_domain}-{platform_sub_id}-v{dataset_version}"
@@ -228,7 +234,8 @@ class DataverseHelper(FilesetPlatformHelper):
# XXX: collection=platform_domain,
collection="datasets",
date=obj_latest['releaseTime'].split('T')[0],
- source=f"https://{platform_domain}/dataset.xhtml?persistentId={platform_id}&version={dataset_version}",
+ source=
+ f"https://{platform_domain}/dataset.xhtml?persistentId={platform_id}&version={dataset_version}",
)
if platform_id.startswith('doi:10.'):
archiveorg_item_meta['doi'] = platform_id.replace('doi:', '')
@@ -238,7 +245,8 @@ class DataverseHelper(FilesetPlatformHelper):
elif block['typeName'] == 'depositor':
archiveorg_item_meta['creator'] = block['value']
elif block['typeName'] == 'dsDescription':
- archiveorg_item_meta['description'] = block['value'][0]['dsDescriptionValue']['value']
+ archiveorg_item_meta['description'] = block['value'][0]['dsDescriptionValue'][
+ 'value']
archiveorg_item_meta['description'] = archiveorg_item_meta.get('description', '')
if obj_latest.get('termsOfUse'):
@@ -252,11 +260,13 @@ class DataverseHelper(FilesetPlatformHelper):
platform_id=platform_id,
archiveorg_item_name=archiveorg_item_name,
archiveorg_item_meta=archiveorg_item_meta,
- web_bundle_url=f"https://{platform_domain}/api/access/dataset/:persistentId/?persistentId={platform_id}&format=original",
+ web_bundle_url=
+ f"https://{platform_domain}/api/access/dataset/:persistentId/?persistentId={platform_id}&format=original",
# TODO: web_base_url= (for GWB downloading, in lieu of platform_url on individual files)
extra=dict(version=dataset_version),
)
+
def test_parse_dataverse_persistentid():
valid = {
@@ -322,8 +332,8 @@ def test_parse_dataverse_persistentid():
except ValueError:
pass
-class FigshareHelper(FilesetPlatformHelper):
+class FigshareHelper(FilesetPlatformHelper):
def __init__(self):
super().__init__()
self.platform_name = 'figshare'
@@ -346,7 +356,9 @@ class FigshareHelper(FilesetPlatformHelper):
raise ValueError(f"not a figshare URL: {path}")
comp = comp[2:]
- if comp[0] in ['dataset',]:
+ if comp[0] in [
+ 'dataset',
+ ]:
comp = comp[1:]
if len(comp) == 3 and comp[1].isdigit() and comp[2].isdigit():
@@ -356,7 +368,8 @@ class FigshareHelper(FilesetPlatformHelper):
else:
raise ValueError(f"couldn't find figshare identiier: {path}")
- def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool:
+ def match_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> bool:
if resource and resource.terminal_url:
url = resource.terminal_url
@@ -381,7 +394,8 @@ class FigshareHelper(FilesetPlatformHelper):
return False
- def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
+ def process_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
"""
@@ -397,13 +411,15 @@ class FigshareHelper(FilesetPlatformHelper):
(platform_id, dataset_version) = self.parse_figshare_url_path(components.path)
assert platform_id.isdigit(), f"expected numeric: {platform_id}"
- assert dataset_version and dataset_version.isdigit(), f"expected numeric: {dataset_version}"
+ assert dataset_version and dataset_version.isdigit(
+ ), f"expected numeric: {dataset_version}"
# 1b. if we didn't get a version number from URL, fetch it from API
# TODO: implement this code path
# 2. API fetch
- resp = self.session.get(f"https://api.figshare.com/v2/articles/{platform_id}/versions/{dataset_version}")
+ resp = self.session.get(
+ f"https://api.figshare.com/v2/articles/{platform_id}/versions/{dataset_version}")
resp.raise_for_status()
obj = resp.json()
@@ -412,18 +428,21 @@ class FigshareHelper(FilesetPlatformHelper):
if not obj['is_public']:
raise PlatformRestrictedError(f'record not public: {platform_id} {dataset_version}')
if obj['is_embargoed']:
- raise PlatformRestrictedError(f'record is embargoed: {obj.get("embargo_title")} ({platform_id} {dataset_version})')
+ raise PlatformRestrictedError(
+ f'record is embargoed: {obj.get("embargo_title")} ({platform_id} {dataset_version})'
+ )
manifest = []
for row in obj['files']:
- manifest.append(FilesetManifestFile(
- path=row['name'],
- size=row['size'],
- md5=row['computed_md5'],
- # NOTE: don't get: sha1, sha256, mimetype
- platform_url=row['download_url'],
- #extra=dict(),
- ))
+ manifest.append(
+ FilesetManifestFile(
+ path=row['name'],
+ size=row['size'],
+ md5=row['computed_md5'],
+ # NOTE: don't get: sha1, sha256, mimetype
+ platform_url=row['download_url'],
+ #extra=dict(),
+ ))
assert not row.get('is_link_only')
authors = []
@@ -451,18 +470,23 @@ class FigshareHelper(FilesetPlatformHelper):
platform_id=platform_id,
archiveorg_item_name=archiveorg_item_name,
archiveorg_item_meta=archiveorg_item_meta,
- web_bundle_url=f"https://ndownloader.figshare.com/articles/{platform_id}/versions/{dataset_version}",
+ web_bundle_url=
+ f"https://ndownloader.figshare.com/articles/{platform_id}/versions/{dataset_version}",
# TODO: web_base_url= (for GWB downloading, in lieu of platform_url on individual files)
extra=dict(version=dataset_version),
)
+
def test_parse_figshare_url_path():
valid = {
- "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858/1": ("8987858", "1"),
- "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858": ("8987858", None),
+ "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858/1":
+ ("8987858", "1"),
+ "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858":
+ ("8987858", None),
"/articles/CIBERSORT_p-value_0_05/8217188/1": ("8217188", "1"),
- "/articles/dataset/STable_1_U-Pb_geochronologic_analyses_on_samples_xls/12127176/4": ("12127176", "4"),
+ "/articles/dataset/STable_1_U-Pb_geochronologic_analyses_on_samples_xls/12127176/4":
+ ("12127176", "4"),
}
invalid = [
@@ -479,14 +503,15 @@ def test_parse_figshare_url_path():
except ValueError:
pass
-class ZenodoHelper(FilesetPlatformHelper):
+class ZenodoHelper(FilesetPlatformHelper):
def __init__(self):
super().__init__()
self.platform_name = 'zenodo'
self.session = requests.Session()
- def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool:
+ def match_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> bool:
if resource and resource.terminal_url:
url = resource.terminal_url
@@ -499,7 +524,8 @@ class ZenodoHelper(FilesetPlatformHelper):
return True
return False
- def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
+ def process_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
"""
@@ -535,12 +561,15 @@ class ZenodoHelper(FilesetPlatformHelper):
assert obj['id'] == int(platform_id)
work_id = obj['conceptrecid']
if work_id == obj['id']:
- raise PlatformScopeError("got a work-level zenodo record, not a versioned record: {work_id}")
+ raise PlatformScopeError(
+ "got a work-level zenodo record, not a versioned record: {work_id}")
zenodo_type = obj['metadata']['resource_type']['type']
if obj['metadata']['access_right'] != 'open':
- raise PlatformRestrictedError("not publicly available ({obj['metadata']['access_right']}): {platform_domain} {platform_id}")
+ raise PlatformRestrictedError(
+ "not publicly available ({obj['metadata']['access_right']}): {platform_domain} {platform_id}"
+ )
manifest = []
for row in obj['files']:
@@ -600,11 +629,9 @@ class ArchiveOrgHelper(FilesetPlatformHelper):
'RAR': 'application/vnd.rar',
'TAR': 'application/x-tar',
'7z': 'application/x-7z-compressed',
-
'HTML': 'text/html',
'Text': 'text/plain',
'PDF': 'application/pdf',
-
'CSV': 'text/csv',
'XML': 'application/xml',
'JSON': 'application/json',
@@ -613,17 +640,13 @@ class ArchiveOrgHelper(FilesetPlatformHelper):
#'application/vnd.openxmlformats-officedocument.wordprocessingml.document', # .docx
#'application/vnd.ms-excel', # .xls
#'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', # .xlsx
-
- 'MP3': 'audio/mpeg', # .mp3
-
- 'MP4': 'video/mp4', # .mp4
- 'MPEG': 'video/mpeg', # .mpeg
-
+ 'MP3': 'audio/mpeg', # .mp3
+ 'MP4': 'video/mp4', # .mp4
+ 'MPEG': 'video/mpeg', # .mpeg
'JPEG': 'image/jpeg',
'GIF': 'image/gif',
'PNG': 'image/png',
'TIFF': 'image/tiff',
-
'Unknown': None,
}
@@ -640,24 +663,27 @@ class ArchiveOrgHelper(FilesetPlatformHelper):
if f.source != 'original':
return False
for suffix in [
- '_meta.sqlite',
- '_archive.torrent',
- '_itemimage.jpg',
- '_meta.xml',
- '_thumb.png',
- '_files.xml',
+ '_meta.sqlite',
+ '_archive.torrent',
+ '_itemimage.jpg',
+ '_meta.xml',
+ '_thumb.png',
+ '_files.xml',
]:
if f.name == item_name + suffix or f.name == item_name.lower() + suffix:
return False
if f.name.startswith('_'):
return False
if item_name.startswith('academictorrents_'):
- for suffix in ['_academictorrents.torrent', '_academictorrents_torrent.txt', '.bib']:
+ for suffix in [
+ '_academictorrents.torrent', '_academictorrents_torrent.txt', '.bib'
+ ]:
if f.name == item_name + suffix:
return False
return True
- def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool:
+ def match_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> bool:
if resource and resource.terminal_url:
url = resource.terminal_url
@@ -672,20 +698,23 @@ class ArchiveOrgHelper(FilesetPlatformHelper):
return True
return False
- def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
+ def process_request(self, request: dict, resource: Optional[ResourceResult],
+ html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
"""
base_url_split = request['base_url'].split('/')
#print(base_url_split, file=sys.stderr)
- assert len(base_url_split) in [5,6]
+ assert len(base_url_split) in [5, 6]
assert base_url_split[0] in ['http:', 'https:']
assert base_url_split[2] == 'archive.org'
assert base_url_split[3] in ['details', 'download']
item_name = base_url_split[4]
if len(base_url_split) == 6 and base_url_split[5]:
- raise PlatformScopeError("got an archive.org file path, not download/details page; individual files not handled yet")
+ raise PlatformScopeError(
+ "got an archive.org file path, not download/details page; individual files not handled yet"
+ )
#print(f" archiveorg processing item={item_name}", file=sys.stderr)
item = self.session.get_item(item_name)
diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py
index c9f182c..6c25276 100644
--- a/python/sandcrawler/fileset_strategies.py
+++ b/python/sandcrawler/fileset_strategies.py
@@ -1,4 +1,3 @@
-
import gzip
import json
import os
@@ -10,15 +9,15 @@ from typing import Any, Dict, List, Optional, Tuple
import internetarchive
-from sandcrawler.fileset_types import (ArchiveStrategyResult, FilesetManifestFile, FilesetPlatformItem, IngestStrategy,
- PlatformScopeError)
+from sandcrawler.fileset_types import (ArchiveStrategyResult, FilesetManifestFile,
+ FilesetPlatformItem, IngestStrategy, PlatformScopeError)
from sandcrawler.html_metadata import BiblioMetadata
-from sandcrawler.ia import ResourceResult, SavePageNowClient, WaybackClient, fix_transfer_encoding
+from sandcrawler.ia import (ResourceResult, SavePageNowClient, WaybackClient,
+ fix_transfer_encoding)
from sandcrawler.misc import gen_file_metadata, gen_file_metadata_path, sanitize_fs_path
class FilesetIngestStrategy():
-
def __init__(self):
#self.ingest_strategy = 'unknown'
self.success_status = "success"
@@ -31,7 +30,6 @@ class FilesetIngestStrategy():
class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
-
def __init__(self, **kwargs):
super().__init__()
self.ingest_strategy = IngestStrategy.ArchiveorgFileset
@@ -61,7 +59,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
found = False
for existing in item_files:
if existing.name == wanted.path:
- if ((existing.sha1 and existing.sha1 == wanted.sha1) or (existing.md5 and existing.md5 == wanted.md5)) and existing.name == wanted.path and existing.size == wanted.size:
+ if ((existing.sha1 and existing.sha1 == wanted.sha1) or
+ (existing.md5 and existing.md5 == wanted.md5)
+ ) and existing.name == wanted.path and existing.size == wanted.size:
found = True
wanted.status = 'exists'
break
@@ -69,7 +69,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
wanted.status = 'mismatch-existing'
break
if not found:
- print(f" item exists ({item.archiveorg_item_name}) but didn't find at least one file: {wanted.path}", file=sys.stderr)
+ print(
+ f" item exists ({item.archiveorg_item_name}) but didn't find at least one file: {wanted.path}",
+ file=sys.stderr)
return None
return ArchiveStrategyResult(
ingest_strategy=self.ingest_strategy,
@@ -108,10 +110,11 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
if not os.path.exists(local_path):
print(f" downloading {m.path}", file=sys.stderr)
- with self.ia_session.get(m.platform_url, stream=True, allow_redirects=True) as r:
+ with self.ia_session.get(m.platform_url, stream=True,
+ allow_redirects=True) as r:
r.raise_for_status()
with open(local_path + '.partial', 'wb') as f:
- for chunk in r.iter_content(chunk_size=256*1024):
+ for chunk in r.iter_content(chunk_size=256 * 1024):
f.write(chunk)
os.rename(local_path + '.partial', local_path)
m.status = 'downloaded-local'
@@ -120,7 +123,8 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
print(f" verifying {m.path}", file=sys.stderr)
file_meta = gen_file_metadata_path(local_path, allow_empty=True)
- assert file_meta['size_bytes'] == m.size, f"expected: {m.size} found: {file_meta['size_bytes']}"
+ assert file_meta[
+ 'size_bytes'] == m.size, f"expected: {m.size} found: {file_meta['size_bytes']}"
if m.sha1:
assert file_meta['sha1hex'] == m.sha1
@@ -142,7 +146,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
if file_meta['mimetype'] != m.mimetype and file_meta['mimetype'] != 'text/plain':
# these 'tab-separated-values' from dataverse are just noise, don't log them
if m.mimetype != 'text/tab-separated-values':
- print(f" WARN: mimetype mismatch: expected {m.mimetype}, found {file_meta['mimetype']}", file=sys.stderr)
+ print(
+ f" WARN: mimetype mismatch: expected {m.mimetype}, found {file_meta['mimetype']}",
+ file=sys.stderr)
m.mimetype = file_meta['mimetype']
else:
m.mimetype = file_meta['mimetype']
@@ -158,7 +164,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
'remote_name': m.path,
})
- print(f" uploading all files to {item.archiveorg_item_name} under {item.archiveorg_item_meta.get('collection')}...", file=sys.stderr)
+ print(
+ f" uploading all files to {item.archiveorg_item_name} under {item.archiveorg_item_meta.get('collection')}...",
+ file=sys.stderr)
internetarchive.upload(
item.archiveorg_item_name,
files=item_files,
@@ -183,25 +191,26 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
return result
+
class ArchiveorgFileStrategy(ArchiveorgFilesetStrategy):
"""
ArchiveorgFilesetStrategy currently works fine with individual files. Just
need to over-ride the ingest_strategy name.
"""
-
def __init__(self):
super().__init__()
self.ingest_strategy = IngestStrategy.ArchiveorgFileset
self.success_status = "success-file"
-class WebFilesetStrategy(FilesetIngestStrategy):
+class WebFilesetStrategy(FilesetIngestStrategy):
def __init__(self, **kwargs):
super().__init__()
self.ingest_strategy = IngestStrategy.WebFileset
self.wayback_client = WaybackClient()
self.try_spn2 = True
- self.spn_client = SavePageNowClient(spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0))
+ self.spn_client = SavePageNowClient(
+ spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0))
self.max_spn_manifest = 20
def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult:
@@ -218,23 +227,26 @@ class WebFilesetStrategy(FilesetIngestStrategy):
for m in item.manifest:
fetch_url = m.platform_url
if not fetch_url:
- raise NotImplementedError("require 'platform_url' for each file when doing Web fetching")
+ raise NotImplementedError(
+ "require 'platform_url' for each file when doing Web fetching")
via = "wayback"
resource = self.wayback_client.lookup_resource(fetch_url, m.mimetype)
- if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture')):
+ if self.try_spn2 and (resource == None or
+ (resource and resource.status == 'no-capture')):
if len(item.manifest) > self.max_spn_manifest:
m.status = 'too-much-spn'
continue
via = "spn2"
- resource = self.spn_client.crawl_resource(fetch_url, self.wayback_client, force_simple_get=True)
+ resource = self.spn_client.crawl_resource(fetch_url,
+ self.wayback_client,
+ force_simple_get=True)
- print("[FETCH {:>6}] {} {}".format(
- via,
- (resource and resource.status),
- (resource and resource.terminal_url) or fetch_url),
- file=sys.stderr)
+ print("[FETCH {:>6}] {} {}".format(via, (resource and resource.status),
+ (resource and resource.terminal_url)
+ or fetch_url),
+ file=sys.stderr)
m.terminal_url = resource.terminal_url
m.terminal_dt = resource.terminal_dt
@@ -251,9 +263,11 @@ class WebFilesetStrategy(FilesetIngestStrategy):
file_meta, html_resource = fix_transfer_encoding(file_meta, resource)
if self.ingest_strategy == "web-file":
- file_file_meta = file_meta
+ file_file_meta = file_meta
- if file_meta['size_bytes'] != m.size or (m.md5 and m.md5 != file_meta['md5hex']) or (m.sha1 and m.sha1 != file_meta['sha1hex']):
+ if file_meta['size_bytes'] != m.size or (m.md5 and m.md5 != file_meta['md5hex']
+ ) or (m.sha1
+ and m.sha1 != file_meta['sha1hex']):
m.status = 'mismatch'
continue
@@ -280,8 +294,8 @@ class WebFilesetStrategy(FilesetIngestStrategy):
result.file_resource = file_resource
return result
-class WebFileStrategy(WebFilesetStrategy):
+class WebFileStrategy(WebFilesetStrategy):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.ingest_strategy = IngestStrategy.WebFile
diff --git a/python/sandcrawler/fileset_types.py b/python/sandcrawler/fileset_types.py
index 8ea136e..606af07 100644
--- a/python/sandcrawler/fileset_types.py
+++ b/python/sandcrawler/fileset_types.py
@@ -1,4 +1,3 @@
-
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
@@ -13,6 +12,7 @@ class IngestStrategy(str, Enum):
ArchiveorgFileset = "archiveorg-fileset"
ArchiveorgFilesetBundled = "archiveorg-fileset-bundled"
+
class FilesetManifestFile(BaseModel):
path: str
size: Optional[int]
@@ -27,6 +27,7 @@ class FilesetManifestFile(BaseModel):
terminal_url: Optional[str]
terminal_dt: Optional[str]
+
class FilesetPlatformItem(BaseModel):
platform_name: str
platform_status: str
@@ -39,6 +40,7 @@ class FilesetPlatformItem(BaseModel):
web_base_url: Optional[str]
web_bundle_url: Optional[str]
+
class ArchiveStrategyResult(BaseModel):
ingest_strategy: str
status: str
@@ -49,6 +51,7 @@ class ArchiveStrategyResult(BaseModel):
bundle_resource: Optional[Any]
bundle_archiveorg_path: Optional[str]
+
class PlatformScopeError(Exception):
"""
For incidents where platform helper discovers that the fileset/dataset is
@@ -61,6 +64,7 @@ class PlatformScopeError(Exception):
"""
pass
+
class PlatformRestrictedError(Exception):
"""
When datasets are not publicly available on a platform (yet)
diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py
index 5242b3a..16bbb01 100644
--- a/python/sandcrawler/grobid.py
+++ b/python/sandcrawler/grobid.py
@@ -1,4 +1,3 @@
-
import requests
from grobid2json import teixml2json
@@ -8,7 +7,6 @@ from .workers import SandcrawlerFetchWorker, SandcrawlerWorker
class GrobidClient(object):
-
def __init__(self, host_url="http://grobid.qa.fatcat.wiki", **kwargs):
self.host_url = host_url
self.consolidate_mode = int(kwargs.get('consolidate_mode', 0))
@@ -34,7 +32,7 @@ class GrobidClient(object):
files={
'input': blob,
'consolidateHeader': self.consolidate_mode,
- 'consolidateCitations': 0, # too expensive for now
+ 'consolidateCitations': 0, # too expensive for now
'includeRawCitations': 1,
},
timeout=180.0,
@@ -46,9 +44,7 @@ class GrobidClient(object):
'error_msg': 'GROBID request (HTTP POST) timeout',
}
- info = dict(
- status_code=grobid_response.status_code,
- )
+ info = dict(status_code=grobid_response.status_code, )
if grobid_response.status_code == 200:
info['status'] = 'success'
info['tei_xml'] = grobid_response.text
@@ -56,7 +52,8 @@ class GrobidClient(object):
# XML is larger than Kafka message size, and much larger than
# an article in general; bail out
info['status'] = 'error'
- info['error_msg'] = "response XML too large: {} bytes".format(len(info['tei_xml']))
+ info['error_msg'] = "response XML too large: {} bytes".format(
+ len(info['tei_xml']))
info.pop('tei_xml')
else:
# response.text is .content decoded as utf-8
@@ -70,7 +67,13 @@ class GrobidClient(object):
tei_json = teixml2json(result['tei_xml'], encumbered=False)
meta = dict()
biblio = dict()
- for k in ('title', 'authors', 'journal', 'date', 'doi', ):
+ for k in (
+ 'title',
+ 'authors',
+ 'journal',
+ 'date',
+ 'doi',
+ ):
if tei_json.get(k):
biblio[k] = tei_json[k]
meta['biblio'] = biblio
@@ -79,8 +82,8 @@ class GrobidClient(object):
meta[k] = tei_json[k]
return meta
-class GrobidWorker(SandcrawlerFetchWorker):
+class GrobidWorker(SandcrawlerFetchWorker):
def __init__(self, grobid_client, wayback_client=None, sink=None, **kwargs):
super().__init__(wayback_client=wayback_client)
self.grobid_client = grobid_client
@@ -104,18 +107,19 @@ class GrobidWorker(SandcrawlerFetchWorker):
return fetch_result
blob = fetch_result['blob']
- result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode)
+ result = self.grobid_client.process_fulltext(blob,
+ consolidate_mode=self.consolidate_mode)
result['file_meta'] = gen_file_metadata(blob)
result['source'] = record
result['key'] = result['file_meta']['sha1hex']
return result
+
class GrobidBlobWorker(SandcrawlerWorker):
"""
This is sort of like GrobidWorker, except it receives blobs directly,
instead of fetching blobs from some remote store.
"""
-
def __init__(self, grobid_client, sink=None, **kwargs):
super().__init__()
self.grobid_client = grobid_client
@@ -125,8 +129,8 @@ class GrobidBlobWorker(SandcrawlerWorker):
def process(self, blob, key=None):
if not blob:
return None
- result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode)
+ result = self.grobid_client.process_fulltext(blob,
+ consolidate_mode=self.consolidate_mode)
result['file_meta'] = gen_file_metadata(blob)
result['key'] = result['file_meta']['sha1hex']
return result
-
diff --git a/python/sandcrawler/html.py b/python/sandcrawler/html.py
index 6bdebdd..a44fc67 100644
--- a/python/sandcrawler/html.py
+++ b/python/sandcrawler/html.py
@@ -1,4 +1,3 @@
-
import json
import re
import sys
@@ -6,7 +5,8 @@ import urllib.parse
from bs4 import BeautifulSoup
-RESEARCHSQUARE_REGEX = re.compile(r'"url":"(https://assets.researchsquare.com/files/.{1,50}/v\d+/Manuscript.pdf)"')
+RESEARCHSQUARE_REGEX = re.compile(
+ r'"url":"(https://assets.researchsquare.com/files/.{1,50}/v\d+/Manuscript.pdf)"')
IEEEXPLORE_REGEX = re.compile(r'"pdfPath":"(/.*?\.pdf)"')
OVID_JOURNAL_URL_REGEX = re.compile(r'journalURL = "(http.*)";')
SCIENCEDIRECT_BOUNCE_URL_REGEX = re.compile(r"window.location = '(http.*)';")
@@ -33,16 +33,16 @@ def extract_fulltext_url(html_url, html_body):
### General Tricks ###
# highwire-style meta tag
- meta = soup.find('meta', attrs={"name":"citation_pdf_url"})
+ meta = soup.find('meta', attrs={"name": "citation_pdf_url"})
if not meta:
- meta = soup.find('meta', attrs={"name":"bepress_citation_pdf_url"})
+ meta = soup.find('meta', attrs={"name": "bepress_citation_pdf_url"})
if not meta:
- meta = soup.find('meta', attrs={"name":"wkhealth_pdf_url"})
+ meta = soup.find('meta', attrs={"name": "wkhealth_pdf_url"})
if not meta:
# researchgate does this; maybe others also?
- meta = soup.find('meta', attrs={"property":"citation_pdf_url"})
+ meta = soup.find('meta', attrs={"property": "citation_pdf_url"})
if not meta:
- meta = soup.find('meta', attrs={"name":"eprints.document_url"})
+ meta = soup.find('meta', attrs={"name": "eprints.document_url"})
# if tag is only partially populated
if meta and not meta.get('content'):
meta = None
@@ -52,10 +52,10 @@ def extract_fulltext_url(html_url, html_body):
if '://doi.org/' in url:
print(f"\tdoi.org in citation_pdf_url (loop?): {url}", file=sys.stderr)
elif url.startswith('/'):
- if host_prefix+url == html_url:
+ if host_prefix + url == html_url:
print(f"\tavoiding citation_pdf_url link-loop", file=sys.stderr)
else:
- return dict(pdf_url=host_prefix+url, technique='citation_pdf_url')
+ return dict(pdf_url=host_prefix + url, technique='citation_pdf_url')
elif url.startswith('http'):
if url == html_url:
print(f"\tavoiding citation_pdf_url link-loop", file=sys.stderr)
@@ -64,7 +64,7 @@ def extract_fulltext_url(html_url, html_body):
else:
print("\tmalformed citation_pdf_url? {}".format(url), file=sys.stderr)
- meta = soup.find('meta', attrs={"name":"generator"})
+ meta = soup.find('meta', attrs={"name": "generator"})
meta_generator = None
if meta and meta.get('content'):
meta_generator = meta['content'].strip()
@@ -105,7 +105,8 @@ def extract_fulltext_url(html_url, html_body):
json_meta = json.loads(json_text)
pdf_meta = json_meta['article']['pdfDownload']['urlMetadata']
# https://www.sciencedirect.com/science/article/pii/S0169204621000670/pdfft?md5=c4a83d06b334b627ded74cf9423bfa56&pid=1-s2.0-S0169204621000670-main.pdf
- url = html_url + pdf_meta['pdfExtension'] + "?md5=" + pdf_meta['queryParams']['md5'] + "&pid=" + pdf_meta['queryParams']['pid']
+ url = html_url + pdf_meta['pdfExtension'] + "?md5=" + pdf_meta['queryParams'][
+ 'md5'] + "&pid=" + pdf_meta['queryParams']['pid']
except (KeyError, TypeError, json.JSONDecodeError):
pass
if url:
@@ -130,7 +131,9 @@ def extract_fulltext_url(html_url, html_body):
if m:
url = m.group(1)
assert len(url) < 4096
- return dict(release_stage="published", pdf_url=host_prefix+url, technique="ieeexplore")
+ return dict(release_stage="published",
+ pdf_url=host_prefix + url,
+ technique="ieeexplore")
# https://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=8730313
if '://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber' in html_url:
# HTML iframe like:
@@ -172,11 +175,12 @@ def extract_fulltext_url(html_url, html_body):
'://thesiscommons.org/',
]
for domain in OSF_DOMAINS:
- if domain in html_url and (len(html_url.split('/')) in [4,5] or '/preprints/' in html_url) and '/download' not in html_url:
+ if domain in html_url and (len(html_url.split('/')) in [4, 5] or '/preprints/'
+ in html_url) and '/download' not in html_url:
if not html_url.endswith("/"):
- next_url = html_url+"/download"
+ next_url = html_url + "/download"
else:
- next_url = html_url+"download"
+ next_url = html_url + "download"
return dict(next_url=next_url, technique='osf-by-url')
# wiley
@@ -199,14 +203,14 @@ def extract_fulltext_url(html_url, html_body):
url = html_url.replace("/doi/10.", "/doi/pdf/10.")
return dict(pdf_url=url, technique='archivist-url')
# <a href="/doi/pdf/10.17723/aarc.62.2.j475270470145630" target="_blank">
- hrefs = soup.find_all('a', attrs={"target":"_blank"})
+ hrefs = soup.find_all('a', attrs={"target": "_blank"})
for href in hrefs:
url = href['href'].strip()
if "/doi/pdf/" in url:
if url.startswith('http'):
return dict(pdf_url=url, technique='publisher-href')
elif url.startswith('/'):
- return dict(pdf_url=host_prefix+url, technique='publisher-href')
+ return dict(pdf_url=host_prefix + url, technique='publisher-href')
# protocols.io
# https://www.protocols.io/view/flow-cytometry-protocol-mgdc3s6
@@ -248,7 +252,8 @@ def extract_fulltext_url(html_url, html_body):
if "://ehp.niehs.nih.gov/doi/" in html_url:
# <a href="/doi/pdf/10.1289/EHP4709" target="_blank">
if b'/doi/pdf/10.' in html_body:
- url = html_url.replace('/doi/full/10.', '/doi/pdf/10.').replace('/doi/10.', '/doi/pdf/10.')
+ url = html_url.replace('/doi/full/10.',
+ '/doi/pdf/10.').replace('/doi/10.', '/doi/pdf/10.')
return dict(pdf_url=url, technique='ehp.niehs.nigh.gov-url')
# cogentoa.com
@@ -275,7 +280,7 @@ def extract_fulltext_url(html_url, html_body):
# http://en.gzbd.cnki.net/gzbt/detail/detail.aspx?FileName=HBGF202002003&DbName=GZBJ7920&DbCode=GZBJ
if '://en.gzbd.cnki.net/KCMS/detail/detail.aspx' in html_url:
# <a onclick="WriteKrsDownLog()" target="_blank" id="pdfDown" name="pdfDown" href="/gzbt/download.aspx?filename=4Q1ZYpFdKFUZ6FDR1QkRrolayRXV2ZzattyQ3QFa2JXTyZXUSV3QRFkbndzaGV2KyJXWZVEbFdVYnZndD9EOxg1Tj5Eeys2SMFzLZ5kcuFkM3dEbsR2ZjxEaShVdJhFdp90KhlVVzcjVVlXUVNHWBtWS5Rlb5cnc&amp;tablename=GZBJLAST2020&amp;dflag=pdfdown&#xA; "><i></i>PDF Download</a>
- href = soup.find('a', attrs={"id":"pdfDown"})
+ href = soup.find('a', attrs={"id": "pdfDown"})
if href:
url = href['href'].strip().replace('&#xA;', '')
if not url.startswith('http'):
@@ -300,7 +305,7 @@ def extract_fulltext_url(html_url, html_body):
# OJS 3 (some)
if meta_generator and meta_generator.startswith("Open Journal Systems"):
- href = soup.find('a', attrs={"class":"obj_galley_link file"})
+ href = soup.find('a', attrs={"class": "obj_galley_link file"})
if href and href.text and "pdf" in href.text.lower():
url = href['href'].strip()
if url.startswith('/'):
@@ -329,13 +334,15 @@ def extract_fulltext_url(html_url, html_body):
return dict()
+
def test_regex():
lines = """
blah
var journalURL = "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689";
asdf"""
m = OVID_JOURNAL_URL_REGEX.search(lines)
- assert m.group(1) == "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689"
+ assert m.group(
+ 1) == "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689"
lines = """
window.onload = function () {
diff --git a/python/sandcrawler/html_metadata.py b/python/sandcrawler/html_metadata.py
index c6725dc..6d27a3a 100644
--- a/python/sandcrawler/html_metadata.py
+++ b/python/sandcrawler/html_metadata.py
@@ -1,4 +1,3 @@
-
import datetime
import sys
import urllib.parse
@@ -31,9 +30,7 @@ HEAD_META_PATTERNS: Any = {
"meta[name='dcterms.title']",
"meta[name='dc.title']",
],
- "subtitle": [
- "meta[name='prism.subtitle']",
- ],
+ "subtitle": ["meta[name='prism.subtitle']", ],
"doi": [
"meta[name='citation_doi']",
"meta[name='DOI']",
@@ -43,9 +40,7 @@ HEAD_META_PATTERNS: Any = {
"meta[name='dc.identifier.doi']",
"meta[name='dc.identifier'][scheme='doi']",
],
- "pmid": [
- "meta[name='citation_pmid']",
- ],
+ "pmid": ["meta[name='citation_pmid']", ],
"abstract": [
"meta[name='citation_abstract']",
"meta[name='bepress_citation_abstract']",
@@ -66,9 +61,7 @@ HEAD_META_PATTERNS: Any = {
"meta[name='dc.source']",
"meta[property='og:site_name']",
],
- "container_abbrev": [
- "meta[name='citation_journal_abbrev']",
- ],
+ "container_abbrev": ["meta[name='citation_journal_abbrev']", ],
"raw_date": [
"meta[name='citation_publication_date']",
"meta[name='bepress_citation_publication_date']",
@@ -169,9 +162,7 @@ HEAD_META_LIST_PATTERNS: Any = {
"meta[name='dc.contributor']",
],
# TODO: citation_author_institution
- "raw_references": [
- "meta[name='citation_reference']",
- ],
+ "raw_references": ["meta[name='citation_reference']", ],
"raw_identifiers": [
"meta[name='eprints.id_number']",
"meta[name='dcterms.identifier']",
@@ -260,7 +251,7 @@ HTML_FULLTEXT_PATTERNS: List[dict] = [
COMPONENT_FULLTEXT_PATTERNS: List[dict] = [
{
- "in_doc_url": "pensoft.net/article/", # also /element/
+ "in_doc_url": "pensoft.net/article/", # also /element/
"in_fulltext_url": "/download/fig/",
"selector": ".Main-Content .figure a.P-Article-Preview-Picture-Download-Small",
"attr": "href",
@@ -652,12 +643,11 @@ class BiblioMetadata(pydantic.BaseModel):
component_url: Optional[str]
class Config:
- json_encoders = {
- datetime.date: lambda dt: dt.isoformat()
- }
+ json_encoders = {datetime.date: lambda dt: dt.isoformat()}
-def html_extract_fulltext_url(doc_url: str, doc: HTMLParser, patterns: List[dict]) -> Optional[Tuple[str, str]]:
+def html_extract_fulltext_url(doc_url: str, doc: HTMLParser,
+ patterns: List[dict]) -> Optional[Tuple[str, str]]:
"""
Tries to quickly extract fulltext URLs using a set of patterns. This
function is intendend to be generic across various extraction techniques.
@@ -701,6 +691,7 @@ def html_extract_fulltext_url(doc_url: str, doc: HTMLParser, patterns: List[dict
return self_doc_url
return None
+
def html_extract_biblio(doc_url: str, doc: HTMLParser) -> Optional[BiblioMetadata]:
meta: Any = dict()
@@ -772,6 +763,7 @@ def html_extract_biblio(doc_url: str, doc: HTMLParser) -> Optional[BiblioMetadat
return BiblioMetadata(**meta)
+
def load_adblock_rules() -> braveblock.Adblocker:
"""
TODO: consider blocking very generic assets:
@@ -838,7 +830,8 @@ def _extract_generic(doc: HTMLParser, selector: str, attrs: List[str], type_name
return resources
-def html_extract_resources(doc_url: str, doc: HTMLParser, adblock: braveblock.Adblocker) -> list:
+def html_extract_resources(doc_url: str, doc: HTMLParser,
+ adblock: braveblock.Adblocker) -> list:
"""
This function tries to find all the important resources in a page. The
presumption is that the HTML document is article fulltext, and we want the
@@ -869,10 +862,12 @@ def html_extract_resources(doc_url: str, doc: HTMLParser, adblock: braveblock.Ad
r['url'] = urllib.parse.urljoin(doc_url, r['url'])
# filter using adblocker
- resources = [r for r in resources if adblock.check_network_urls(r['url'], source_url=doc_url, request_type=r['type']) == False]
+ resources = [
+ r for r in resources if adblock.check_network_urls(
+ r['url'], source_url=doc_url, request_type=r['type']) == False
+ ]
# remove duplicates
resources = [dict(t) for t in {tuple(d.items()) for d in resources}]
return resources
-
diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py
index ca1182f..a8ce193 100644
--- a/python/sandcrawler/ia.py
+++ b/python/sandcrawler/ia.py
@@ -1,4 +1,3 @@
-
# XXX: some broken MRO thing going on in here due to python3 object wrangling
# in `wayback` library. Means we can't run pylint.
# pylint: skip-file
@@ -38,6 +37,7 @@ class SandcrawlerBackoffError(Exception):
"""
pass
+
ResourceResult = namedtuple("ResourceResult", [
"start_url",
"hit",
@@ -80,6 +80,7 @@ CdxPartial = namedtuple('CdxPartial', [
'sha1hex',
])
+
def cdx_partial_from_row(full):
return CdxPartial(
surt=full.surt,
@@ -91,6 +92,7 @@ def cdx_partial_from_row(full):
sha1hex=full.sha1hex,
)
+
def cdx_to_dict(cdx):
d = {
"surt": cdx.surt,
@@ -107,6 +109,7 @@ def cdx_to_dict(cdx):
d['warc_path'] = cdx.warc_path
return d
+
def fuzzy_match_url(left, right):
"""
Matches URLs agnostic of http/https (and maybe other normalizations in the
@@ -123,6 +126,7 @@ def fuzzy_match_url(left, right):
return True
return False
+
def test_fuzzy_match_url():
assert fuzzy_match_url("http://thing.com", "http://thing.com") == True
assert fuzzy_match_url("http://thing.com", "https://thing.com") == True
@@ -137,18 +141,19 @@ def test_fuzzy_match_url():
assert fuzzy_match_url("http://www.thing.com", "http://www2.thing.com") == False
assert fuzzy_match_url("http://www.thing.com", "https://www2.thing.com") == False
+
class CdxApiError(Exception):
pass
-class CdxApiClient:
+class CdxApiClient:
def __init__(self, host_url="https://web.archive.org/cdx/search/cdx", **kwargs):
self.host_url = host_url
self.http_session = requests_retry_session(retries=3, backoff_factor=3)
- cdx_auth_token = kwargs.get('cdx_auth_token',
- os.environ.get('CDX_AUTH_TOKEN'))
+ cdx_auth_token = kwargs.get('cdx_auth_token', os.environ.get('CDX_AUTH_TOKEN'))
if not cdx_auth_token:
- raise Exception("CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)")
+ raise Exception(
+ "CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)")
self.http_session.headers.update({
'User-Agent': 'Mozilla/5.0 sandcrawler.CdxApiClient',
'Cookie': 'cdx_auth_token={}'.format(cdx_auth_token),
@@ -208,7 +213,8 @@ class CdxApiClient:
found, because we expect to be looking up a specific full record.
"""
if len(datetime) != 14:
- raise ValueError("CDX fetch requires full 14 digit timestamp. Got: {}".format(datetime))
+ raise ValueError(
+ "CDX fetch requires full 14 digit timestamp. Got: {}".format(datetime))
params = {
'url': url,
'from': datetime,
@@ -226,18 +232,28 @@ class CdxApiClient:
if retry_sleep > 3:
next_sleep = retry_sleep - 3
retry_sleep = 3
- print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), file=sys.stderr)
+ print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep),
+ file=sys.stderr)
time.sleep(retry_sleep)
- return self.fetch(url, datetime, filter_status_code=filter_status_code, retry_sleep=next_sleep)
+ return self.fetch(url,
+ datetime,
+ filter_status_code=filter_status_code,
+ retry_sleep=next_sleep)
raise KeyError("CDX url/datetime not found: {} {}".format(url, datetime))
row = resp[0]
# allow fuzzy http/https match
if not (fuzzy_match_url(row.url, url) and row.datetime == datetime):
if retry_sleep and retry_sleep > 0:
- print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), file=sys.stderr)
+ print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep),
+ file=sys.stderr)
time.sleep(retry_sleep)
- return self.fetch(url, datetime, filter_status_code=filter_status_code, retry_sleep=None)
- raise KeyError("Didn't get exact CDX url/datetime match. url:{} dt:{} got:{}".format(url, datetime, row))
+ return self.fetch(url,
+ datetime,
+ filter_status_code=filter_status_code,
+ retry_sleep=None)
+ raise KeyError(
+ "Didn't get exact CDX url/datetime match. url:{} dt:{} got:{}".format(
+ url, datetime, row))
if filter_status_code:
assert row.status_code == filter_status_code
return row
@@ -311,17 +327,20 @@ class CdxApiClient:
class WaybackError(Exception):
pass
+
class WaybackContentError(Exception):
pass
+
class PetaboxError(Exception):
pass
+
class NoCaptureError(Exception):
pass
-class WaybackClient:
+class WaybackClient:
def __init__(self, cdx_client=None, **kwargs):
if cdx_client:
self.cdx_client = cdx_client
@@ -367,32 +386,42 @@ class WaybackClient:
if not self.petabox_webdata_secret:
raise Exception("WaybackClient needs petabox secret to do direct WARC fetches")
if not "/" in warc_path:
- raise ValueError("what looks like a liveweb/SPN temporary warc path: {}".format(warc_path))
+ raise ValueError(
+ "what looks like a liveweb/SPN temporary warc path: {}".format(warc_path))
warc_uri = self.warc_uri_prefix + warc_path
if not self.rstore:
- self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory3(
- webdata_secret=self.petabox_webdata_secret,
- ))
+ self.rstore = ResourceStore(
+ loaderfactory=CDXLoaderFactory3(webdata_secret=self.petabox_webdata_secret, ))
try:
#print("offset: {} csize: {} uri: {}".format(offset, csize, warc_uri), file=sys.stderr)
gwb_record = self.rstore.load_resource(warc_uri, offset, csize)
except wayback.exception.ResourceUnavailable:
print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr)
- raise PetaboxError("failed to load file contents from wayback/petabox (ResourceUnavailable)")
+ raise PetaboxError(
+ "failed to load file contents from wayback/petabox (ResourceUnavailable)")
except wayback.exception.InvalidResource:
print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr)
- raise WaybackContentError("failed to load file contents from wayback/petabox (InvalidResource)")
+ raise WaybackContentError(
+ "failed to load file contents from wayback/petabox (InvalidResource)")
except urllib3.exceptions.ReadTimeoutError as rte:
- raise PetaboxError("failed to load file contents from wayback/petabox (ReadTimeoutError: {})".format(rte))
+ raise PetaboxError(
+ "failed to load file contents from wayback/petabox (ReadTimeoutError: {})".
+ format(rte))
except ValueError as ve:
- raise PetaboxError("failed to load file contents from wayback/petabox (ValueError: {})".format(ve))
+ raise PetaboxError(
+ "failed to load file contents from wayback/petabox (ValueError: {})".format(ve))
except EOFError as eofe:
- raise PetaboxError("failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
+ raise PetaboxError(
+ "failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
except TypeError as te:
- raise PetaboxError("failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te))
+ raise PetaboxError(
+ "failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)"
+ .format(te))
except Exception as e:
if "while decompressing data: invalid block type" in str(e):
- raise PetaboxError("decompression error fetching WARC record; usually due to bad alexa ARC files")
+ raise PetaboxError(
+ "decompression error fetching WARC record; usually due to bad alexa ARC files"
+ )
else:
raise e
# Note: could consider a generic "except Exception" here, as we get so
@@ -405,7 +434,8 @@ class WaybackClient:
raise WaybackContentError("too many HTTP headers (in wayback fetch)")
location = gwb_record.get_location() or None
- if status_code is None and gwb_record.target_uri.startswith(b"ftp://") and not gwb_record.is_revisit():
+ if status_code is None and gwb_record.target_uri.startswith(
+ b"ftp://") and not gwb_record.is_revisit():
# TODO: some additional verification here?
status_code = 226
@@ -416,8 +446,9 @@ class WaybackClient:
raise WaybackContentError("found revisit record, but won't resolve (loop?)")
revisit_uri, revisit_dt = gwb_record.refers_to
if not (revisit_uri and revisit_dt):
- raise WaybackContentError("revisit record missing URI and/or DT: warc:{} offset:{}".format(
- warc_path, offset))
+ raise WaybackContentError(
+ "revisit record missing URI and/or DT: warc:{} offset:{}".format(
+ warc_path, offset))
# convert revisit_dt
# len("2018-07-24T11:56:49"), or with "Z"
assert len(revisit_dt) in (19, 20)
@@ -425,7 +456,9 @@ class WaybackClient:
revisit_uri = revisit_uri.decode('utf-8')
if type(revisit_dt) is bytes:
revisit_dt = revisit_dt.decode('utf-8')
- revisit_dt = revisit_dt.replace('-', '').replace(':', '').replace('T', '').replace('Z', '')
+ revisit_dt = revisit_dt.replace('-', '').replace(':',
+ '').replace('T',
+ '').replace('Z', '')
assert len(revisit_dt) == 14
try:
revisit_cdx = self.cdx_client.fetch(revisit_uri, revisit_dt)
@@ -443,10 +476,10 @@ class WaybackClient:
body = gwb_record.open_raw_content().read()
except IncompleteRead as ire:
raise WaybackError(
- "failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire))
+ "failed to read actual file contents from wayback/petabox (IncompleteRead: {})"
+ .format(ire))
elif status_code is None:
- raise WaybackContentError(
- "got a None status_code in (W)ARC record")
+ raise WaybackContentError("got a None status_code in (W)ARC record")
return WarcResource(
status_code=status_code,
location=location,
@@ -454,7 +487,12 @@ class WaybackClient:
revisit_cdx=revisit_cdx,
)
- def fetch_petabox_body(self, csize, offset, warc_path, resolve_revisit=True, expected_status_code=None):
+ def fetch_petabox_body(self,
+ csize,
+ offset,
+ warc_path,
+ resolve_revisit=True,
+ expected_status_code=None):
"""
Fetches HTTP 200 WARC resource directly from petabox using WARC path/offset/csize.
@@ -474,12 +512,10 @@ class WaybackClient:
raise KeyError("archived HTTP response (WARC) was not {}: {}".format(
expected_status_code,
resource.status_code,
- )
- )
+ ))
elif resource.status_code not in (200, 226):
raise KeyError("archived HTTP response (WARC) was not 200: {}".format(
- resource.status_code)
- )
+ resource.status_code))
return resource.body
@@ -514,7 +550,9 @@ class WaybackClient:
except requests.exceptions.ChunkedEncodingError:
raise WaybackError("ChunkedEncodingError (wayback replay fetch)")
except UnicodeDecodeError:
- raise WaybackContentError("UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(url))
+ raise WaybackContentError(
+ "UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(
+ url))
try:
resp.raise_for_status()
@@ -526,21 +564,20 @@ class WaybackClient:
if not "X-Archive-Src" in resp.headers:
raise WaybackError("replay fetch didn't return X-Archive-Src in headers")
if not datetime in resp.url:
- raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(datetime, resp.url))
+ raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(
+ datetime, resp.url))
if cdx_sha1hex:
# verify that body matches CDX hash
# TODO: don't need *all* these hashes, just sha1
file_meta = gen_file_metadata(resp.content)
if cdx_sha1hex != file_meta['sha1hex']:
- print(" REPLAY MISMATCH: cdx:{} replay:{}".format(
- cdx_sha1hex,
- file_meta['sha1hex']),
- file=sys.stderr)
- raise WaybackContentError("replay fetch body didn't match CDX hash cdx:{} body:{}".format(
- cdx_sha1hex,
- file_meta['sha1hex']),
- )
+ print(" REPLAY MISMATCH: cdx:{} replay:{}".format(cdx_sha1hex,
+ file_meta['sha1hex']),
+ file=sys.stderr)
+ raise WaybackContentError(
+ "replay fetch body didn't match CDX hash cdx:{} body:{}".format(
+ cdx_sha1hex, file_meta['sha1hex']), )
return resp.content
def fetch_replay_redirect(self, url, datetime):
@@ -568,7 +605,9 @@ class WaybackClient:
except requests.exceptions.TooManyRedirects:
raise WaybackContentError("redirect loop (wayback replay fetch)")
except UnicodeDecodeError:
- raise WaybackContentError("UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(url))
+ raise WaybackContentError(
+ "UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(
+ url))
try:
resp.raise_for_status()
except Exception as e:
@@ -580,7 +619,8 @@ class WaybackClient:
if not "X-Archive-Src" in resp.headers:
raise WaybackError("redirect replay fetch didn't return X-Archive-Src in headers")
if not datetime in resp.url:
- raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(datetime, resp.url))
+ raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(
+ datetime, resp.url))
redirect_url = resp.headers.get("Location")
# eg, https://web.archive.org/web/20200111003923id_/https://dx.doi.org/10.17504/protocols.io.y2gfybw
@@ -622,7 +662,9 @@ class WaybackClient:
urls_seen = [start_url]
for i in range(self.max_redirects):
print(" URL: {}".format(next_url), file=sys.stderr)
- cdx_row = self.cdx_client.lookup_best(next_url, best_mimetype=best_mimetype, closest=closest)
+ cdx_row = self.cdx_client.lookup_best(next_url,
+ best_mimetype=best_mimetype,
+ closest=closest)
#print(cdx_row, file=sys.stderr)
if not cdx_row:
return ResourceResult(
@@ -776,9 +818,11 @@ class WaybackClient:
class SavePageNowError(Exception):
pass
+
class SavePageNowBackoffError(SandcrawlerBackoffError):
pass
+
SavePageNowResult = namedtuple('SavePageNowResult', [
'success',
'status',
@@ -789,13 +833,11 @@ SavePageNowResult = namedtuple('SavePageNowResult', [
'resources',
])
-class SavePageNowClient:
+class SavePageNowClient:
def __init__(self, v2endpoint="https://web.archive.org/save", **kwargs):
- self.ia_access_key = kwargs.get('ia_access_key',
- os.environ.get('IA_ACCESS_KEY'))
- self.ia_secret_key = kwargs.get('ia_secret_key',
- os.environ.get('IA_SECRET_KEY'))
+ self.ia_access_key = kwargs.get('ia_access_key', os.environ.get('IA_ACCESS_KEY'))
+ self.ia_secret_key = kwargs.get('ia_secret_key', os.environ.get('IA_SECRET_KEY'))
self.v2endpoint = v2endpoint
self.v2_session = requests_retry_session(retries=5, backoff_factor=3)
self.v2_session.headers.update({
@@ -886,12 +928,15 @@ class SavePageNowClient:
},
)
if resp.status_code == 429:
- raise SavePageNowBackoffError("status_code: {}, url: {}".format(resp.status_code, request_url))
+ raise SavePageNowBackoffError("status_code: {}, url: {}".format(
+ resp.status_code, request_url))
elif resp.status_code != 200:
- raise SavePageNowError("SPN2 status_code: {}, url: {}".format(resp.status_code, request_url))
+ raise SavePageNowError("SPN2 status_code: {}, url: {}".format(
+ resp.status_code, request_url))
resp_json = resp.json()
- if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json['message']:
+ if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json[
+ 'message']:
raise SavePageNowBackoffError(resp_json['message'])
elif not resp_json or 'job_id' not in resp_json or not resp_json['job_id']:
raise SavePageNowError(
@@ -915,7 +960,8 @@ class SavePageNowClient:
final_json = resp.json()
break
else:
- raise SavePageNowError("Unknown SPN2 status:{} url:{}".format(status, request_url))
+ raise SavePageNowError("Unknown SPN2 status:{} url:{}".format(
+ status, request_url))
if not final_json:
raise SavePageNowError("SPN2 timed out (polling count exceeded)")
@@ -923,8 +969,10 @@ class SavePageNowClient:
# if there was a recent crawl of same URL, fetch the status of that
# crawl to get correct datetime
if final_json.get('original_job_id'):
- print(f" SPN recent capture: {job_id} -> {final_json['original_job_id']}", file=sys.stderr)
- resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint, final_json['original_job_id']))
+ print(f" SPN recent capture: {job_id} -> {final_json['original_job_id']}",
+ file=sys.stderr)
+ resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint,
+ final_json['original_job_id']))
try:
resp.raise_for_status()
except:
@@ -935,7 +983,8 @@ class SavePageNowClient:
if final_json['status'] == "success":
if final_json.get('original_url').startswith('/'):
- print(f" truncateded URL in JSON: {request_url} {json.dumps(final_json)}", file=sys.stderr)
+ print(f" truncateded URL in JSON: {request_url} {json.dumps(final_json)}",
+ file=sys.stderr)
return SavePageNowResult(
True,
"success",
@@ -969,15 +1018,17 @@ class SavePageNowClient:
# HACK: capture CNKI domains with outlinks (for COVID-19 crawling)
if 'gzbd.cnki.net/' in start_url:
- spn_result = self.save_url_now_v2(start_url, force_simple_get=force_simple_get, capture_outlinks=1)
+ spn_result = self.save_url_now_v2(start_url,
+ force_simple_get=force_simple_get,
+ capture_outlinks=1)
else:
spn_result = self.save_url_now_v2(start_url, force_simple_get=force_simple_get)
if not spn_result.success:
status = spn_result.status
if status in ("error:invalid-url", "error:not-found",
- "error:invalid-host-resolution", "error:gateway-timeout",
- "error:too-many-redirects", "error:read-timeout"):
+ "error:invalid-host-resolution", "error:gateway-timeout",
+ "error:too-many-redirects", "error:read-timeout"):
status = status.replace("error:", "")
elif status in ("error:no-access", "error:forbidden"):
status = "forbidden"
@@ -988,7 +1039,8 @@ class SavePageNowClient:
elif status.startswith("error:"):
status = "spn2-" + status
# despite other errors, call these a failure (so we don't retry)
- if spn_result.terminal_url and (spn_result.terminal_url.endswith('/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1")):
+ if spn_result.terminal_url and (spn_result.terminal_url.endswith('/cookieAbsent')
+ or spn_result.terminal_url.endswith("cookieSet=1")):
status = "blocked-cookie"
return ResourceResult(
start_url=start_url,
@@ -1018,7 +1070,8 @@ class SavePageNowClient:
)
# don't try to CDX fetch for this common cookie block terminal
- if spn_result.terminal_url.endswith('/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1"):
+ if spn_result.terminal_url.endswith(
+ '/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1"):
return ResourceResult(
start_url=start_url,
hit=False,
@@ -1143,9 +1196,12 @@ class SavePageNowClient:
)
-def fix_transfer_encoding(file_meta: dict, resource: ResourceResult) -> Tuple[dict, ResourceResult]:
- if resource.body and file_meta['mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip':
- print(" transfer encoding not stripped: {}".format(resource.cdx.mimetype), file=sys.stderr)
+def fix_transfer_encoding(file_meta: dict,
+ resource: ResourceResult) -> Tuple[dict, ResourceResult]:
+ if resource.body and file_meta[
+ 'mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip':
+ print(" transfer encoding not stripped: {}".format(resource.cdx.mimetype),
+ file=sys.stderr)
inner_body = gzip.decompress(resource.body)
if not inner_body:
raise Exception("null body inside transfer encoding")
diff --git a/python/sandcrawler/ingest_file.py b/python/sandcrawler/ingest_file.py
index 137a793..b480cc2 100644
--- a/python/sandcrawler/ingest_file.py
+++ b/python/sandcrawler/ingest_file.py
@@ -1,4 +1,3 @@
-
import base64
import gzip
import json
@@ -15,18 +14,22 @@ from selectolax.parser import HTMLParser
from sandcrawler.db import SandcrawlerPostgrestClient
from sandcrawler.grobid import GrobidClient
from sandcrawler.html import extract_fulltext_url
-from sandcrawler.html_metadata import BiblioMetadata, html_extract_biblio, html_extract_resources, load_adblock_rules
-from sandcrawler.ia import (CdxApiClient, CdxApiError, NoCaptureError, PetaboxError, ResourceResult, SavePageNowClient,
- SavePageNowError, WaybackClient, WaybackContentError, WaybackError, cdx_to_dict,
+from sandcrawler.html_metadata import (BiblioMetadata, html_extract_biblio,
+ html_extract_resources, load_adblock_rules)
+from sandcrawler.ia import (CdxApiClient, CdxApiError, NoCaptureError, PetaboxError,
+ ResourceResult, SavePageNowClient, SavePageNowError, WaybackClient,
+ WaybackContentError, WaybackError, cdx_to_dict,
fix_transfer_encoding)
-from sandcrawler.ingest_html import (WebResource, fetch_html_resources, html_extract_body_teixml, html_guess_platform,
+from sandcrawler.ingest_html import (WebResource, fetch_html_resources,
+ html_extract_body_teixml, html_guess_platform,
html_guess_scope, quick_fetch_html_resources)
from sandcrawler.misc import clean_url, gen_file_metadata, parse_cdx_datetime
from sandcrawler.pdfextract import PdfExtractResult, process_pdf
from sandcrawler.workers import SandcrawlerWorker
from sandcrawler.xml import xml_reserialize
-MAX_BODY_SIZE_BYTES = 128*1024*1024
+MAX_BODY_SIZE_BYTES = 128 * 1024 * 1024
+
class IngestFileWorker(SandcrawlerWorker):
"""
@@ -54,7 +57,6 @@ class IngestFileWorker(SandcrawlerWorker):
process_file_hit(ResourceResult) -> response
process_grobid(ResourceResult)
"""
-
def __init__(self, sink=None, **kwargs):
super().__init__()
@@ -64,7 +66,8 @@ class IngestFileWorker(SandcrawlerWorker):
self.wayback_client = WaybackClient()
self.spn_client = kwargs.get('spn_client')
if not self.spn_client:
- self.spn_client = SavePageNowClient(spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0))
+ self.spn_client = SavePageNowClient(
+ spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0))
self.grobid_client = kwargs.get('grobid_client')
if not self.grobid_client:
self.grobid_client = GrobidClient()
@@ -123,13 +126,13 @@ class IngestFileWorker(SandcrawlerWorker):
"fao.org/glis/",
# Historical non-paper content:
- "dhz.uni-passau.de/", # newspapers
- "digital.ucd.ie/", # ireland national historical
+ "dhz.uni-passau.de/", # newspapers
+ "digital.ucd.ie/", # ireland national historical
# DOI prefixes
- "doi.org/10.2307/", # JSTOR; slow and many redirects
- "doi.org/10.18730/", # fao.org: database entry
- "doi.org/10.15468/", # gbif.org: database entry
+ "doi.org/10.2307/", # JSTOR; slow and many redirects
+ "doi.org/10.18730/", # fao.org: database entry
+ "doi.org/10.15468/", # gbif.org: database entry
# deprecated domain (doesn't redirect correctly)
"://edoc.mpg.de/",
@@ -173,10 +176,10 @@ class IngestFileWorker(SandcrawlerWorker):
"video/mpeg",
"text/plain",
"text/csv",
- "text/x-r-source", # dataverse
- "text/tab-separated-values", # dataverse
- "text/x-rst", # dataverse
- "application/x-rlang-transport", # dataverse
+ "text/x-r-source", # dataverse
+ "text/tab-separated-values", # dataverse
+ "text/x-rst", # dataverse
+ "application/x-rlang-transport", # dataverse
"application/json",
"application/xml",
"application/pdf",
@@ -194,7 +197,6 @@ class IngestFileWorker(SandcrawlerWorker):
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
]
-
def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]:
"""
Check in sandcrawler-db (postgres) to see if we have already ingested
@@ -214,7 +216,10 @@ class IngestFileWorker(SandcrawlerWorker):
else:
return None
- def find_resource(self, url, best_mimetype=None, force_recrawl=False) -> Optional[ResourceResult]:
+ def find_resource(self,
+ url,
+ best_mimetype=None,
+ force_recrawl=False) -> Optional[ResourceResult]:
"""
Looks in wayback for a resource starting at the URL, following any
redirects. If a hit isn't found, try crawling with SPN.
@@ -222,7 +227,8 @@ class IngestFileWorker(SandcrawlerWorker):
via = "none"
resource = None
- if url.startswith("http://web.archive.org/web/") or url.startswith("https://web.archive.org/web/"):
+ if url.startswith("http://web.archive.org/web/") or url.startswith(
+ "https://web.archive.org/web/"):
raise NotImplementedError("handling direct wayback links not supported yet")
if url.startswith("http://archive.org/") or url.startswith("https://archive.org/"):
@@ -243,14 +249,13 @@ class IngestFileWorker(SandcrawlerWorker):
if resource and not resource.hit and resource.terminal_dt and resource.terminal_dt < '20190000000000':
old_failure = True
- if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture') or soft404 or old_failure):
+ if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture')
+ or soft404 or old_failure):
via = "spn2"
resource = self.spn_client.crawl_resource(url, self.wayback_client)
- print("[FETCH {:>6}] {} {}".format(
- via,
- (resource and resource.status),
- (resource and resource.terminal_url) or url),
- file=sys.stderr)
+ print("[FETCH {:>6}] {} {}".format(via, (resource and resource.status),
+ (resource and resource.terminal_url) or url),
+ file=sys.stderr)
return resource
def process_existing(self, request: dict, result_row: dict) -> dict:
@@ -262,7 +267,8 @@ class IngestFileWorker(SandcrawlerWorker):
assert result_row['hit']
existing_file_meta = self.pgrest_client.get_file_meta(result_row['terminal_sha1hex'])
existing_grobid = self.pgrest_client.get_grobid(result_row['terminal_sha1hex'])
- existing_cdx = self.pgrest_client.get_cdx(result_row['terminal_url'], result_row['terminal_dt'])
+ existing_cdx = self.pgrest_client.get_cdx(result_row['terminal_url'],
+ result_row['terminal_dt'])
if not (existing_file_meta and existing_grobid and existing_cdx):
raise NotImplementedError("partially-exsiting records not implemented yet")
result = {
@@ -281,11 +287,13 @@ class IngestFileWorker(SandcrawlerWorker):
}
return result
- def process_file_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict:
+ def process_file_hit(self, ingest_type: str, resource: ResourceResult,
+ file_meta: dict) -> dict:
"""
Run all the necessary processing for a new/fresh ingest hit.
"""
- if ingest_type in ["dataset-file", "component"] and file_meta['mimetype'] == "application/pdf":
+ if ingest_type in ["dataset-file", "component"
+ ] and file_meta['mimetype'] == "application/pdf":
ingest_type = "pdf"
if ingest_type == "pdf":
return {
@@ -396,24 +404,26 @@ class IngestFileWorker(SandcrawlerWorker):
try:
html_doc = HTMLParser(resource.body)
except ValueError as ve:
- return dict(
- status="html-selectolax-error",
- )
+ return dict(status="html-selectolax-error", )
html_biblio = html_extract_biblio(resource.terminal_url, html_doc)
assert html_biblio
html_body = html_extract_body_teixml(resource.body)
html_platform = html_guess_platform(resource.terminal_url, html_doc, html_biblio)
- html_scope = html_guess_scope(resource.terminal_url, html_doc, html_biblio, html_body.get('word_count'))
+ html_scope = html_guess_scope(resource.terminal_url, html_doc, html_biblio,
+ html_body.get('word_count'))
html_biblio_dict = json.loads(html_biblio.json(exclude_none=True))
- if html_scope in ('blocked-captcha','blocked-cookie','blocked-forbidden'):
+ if html_scope in ('blocked-captcha', 'blocked-cookie', 'blocked-forbidden'):
return dict(
status=html_scope,
html_biblio=html_biblio_dict,
scope=html_scope,
platform=html_platform,
)
- elif html_scope not in ('article-fulltext','unknown',):
+ elif html_scope not in (
+ 'article-fulltext',
+ 'unknown',
+ ):
html_body.pop("tei_xml", None)
return dict(
status="wrong-scope",
@@ -423,7 +433,8 @@ class IngestFileWorker(SandcrawlerWorker):
html_body=html_body,
)
- raw_resources = html_extract_resources(resource.terminal_url, html_doc, self.adblock_rules)
+ raw_resources = html_extract_resources(resource.terminal_url, html_doc,
+ self.adblock_rules)
if len(raw_resources) > self.max_html_resources:
html_body.pop("tei_xml", None)
return dict(
@@ -452,7 +463,9 @@ class IngestFileWorker(SandcrawlerWorker):
try:
if self.html_quick_mode:
print(" WARN: running quick CDX-only fetches", file=sys.stderr)
- full_resources = quick_fetch_html_resources(raw_resources, self.wayback_client.cdx_client, when)
+ full_resources = quick_fetch_html_resources(raw_resources,
+ self.wayback_client.cdx_client,
+ when)
else:
full_resources = fetch_html_resources(raw_resources, self.wayback_client, when)
except PetaboxError as e:
@@ -572,7 +585,9 @@ class IngestFileWorker(SandcrawlerWorker):
return result
try:
- resource = self.find_resource(next_url, best_mimetype, force_recrawl=force_recrawl)
+ resource = self.find_resource(next_url,
+ best_mimetype,
+ force_recrawl=force_recrawl)
except SavePageNowError as e:
result['status'] = 'spn2-error'
result['error_message'] = str(e)[:1600]
@@ -650,10 +665,9 @@ class IngestFileWorker(SandcrawlerWorker):
# here we split based on ingest type to try and extract a next hop
html_ish_resource = bool(
"html" in file_meta['mimetype']
- or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml"
+ or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml"
or "application/xml" in file_meta['mimetype']
- or "text/xml" in file_meta['mimetype']
- )
+ or "text/xml" in file_meta['mimetype'])
html_biblio = None
html_doc = None
if html_ish_resource and resource.body:
@@ -662,7 +676,8 @@ class IngestFileWorker(SandcrawlerWorker):
html_biblio = html_extract_biblio(resource.terminal_url, html_doc)
if html_biblio:
if not 'html_biblio' in result or html_biblio.title:
- result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True))
+ result['html_biblio'] = json.loads(
+ html_biblio.json(exclude_none=True))
#print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr)
except ValueError:
pass
@@ -686,18 +701,19 @@ class IngestFileWorker(SandcrawlerWorker):
assert next_url
next_url = clean_url(next_url)
print("[PARSE {:>6}] {} {}".format(
- ingest_type,
- fulltext_url.get('technique'),
- next_url,
- ),
- file=sys.stderr)
+ ingest_type,
+ fulltext_url.get('technique'),
+ next_url,
+ ),
+ file=sys.stderr)
if next_url in hops:
result['status'] = 'link-loop'
result['error_message'] = "repeated: {}".format(next_url)
return result
hops.append(next_url)
continue
- elif ingest_type in ("xml", "html", "component") and html_ish_resource and html_biblio:
+ elif ingest_type in ("xml", "html",
+ "component") and html_ish_resource and html_biblio:
# NOTE: src_fulltext_url is not a thing
next_url_found = None
if ingest_type == "xml" and html_biblio.xml_fulltext_url:
@@ -711,11 +727,11 @@ class IngestFileWorker(SandcrawlerWorker):
next_url = next_url_found
technique = "html_biblio"
print("[PARSE {:>6}] {} {}".format(
- ingest_type,
- technique,
- next_url,
- ),
- file=sys.stderr)
+ ingest_type,
+ technique,
+ next_url,
+ ),
+ file=sys.stderr)
if next_url in hops:
if ingest_type == "html":
# for HTML ingest, we don't count this as a link-loop
@@ -756,7 +772,8 @@ class IngestFileWorker(SandcrawlerWorker):
result['status'] = "wrong-mimetype" # formerly: "other-mimetype"
return result
elif ingest_type == "xml":
- if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"):
+ if file_meta['mimetype'] not in ("application/xml", "text/xml",
+ "application/jats+xml"):
result['status'] = "wrong-mimetype"
return result
elif ingest_type == "html":
@@ -786,18 +803,18 @@ class IngestFileWorker(SandcrawlerWorker):
result['hit'] = True
if ingest_type == "pdf":
print("[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format(
- ingest_type,
- result.get('file_meta', {}).get('sha1hex'),
- result.get('grobid', {}).get('status_code'),
- result.get('pdf_meta', {}).get('status'),
- ),
- file=sys.stderr)
+ ingest_type,
+ result.get('file_meta', {}).get('sha1hex'),
+ result.get('grobid', {}).get('status_code'),
+ result.get('pdf_meta', {}).get('status'),
+ ),
+ file=sys.stderr)
else:
print("[SUCCESS {:>5}] sha1:{}".format(
- ingest_type,
- result.get('file_meta', {}).get('sha1hex'),
- ),
- file=sys.stderr)
+ ingest_type,
+ result.get('file_meta', {}).get('sha1hex'),
+ ),
+ file=sys.stderr)
return result
diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py
index 11386df..5cbb908 100644
--- a/python/sandcrawler/ingest_fileset.py
+++ b/python/sandcrawler/ingest_fileset.py
@@ -1,4 +1,3 @@
-
import gzip
import json
import sys
@@ -14,17 +13,21 @@ from sandcrawler.fileset_platforms import DATASET_PLATFORM_HELPER_TABLE, Fileset
from sandcrawler.fileset_strategies import FILESET_STRATEGY_HELPER_TABLE, FilesetIngestStrategy
from sandcrawler.fileset_types import PlatformRestrictedError, PlatformScopeError
from sandcrawler.html import extract_fulltext_url
-from sandcrawler.html_metadata import BiblioMetadata, html_extract_biblio, html_extract_resources, load_adblock_rules
-from sandcrawler.ia import (CdxApiClient, CdxApiError, NoCaptureError, PetaboxError, ResourceResult, SavePageNowClient,
- SavePageNowError, WaybackClient, WaybackContentError, WaybackError, cdx_to_dict,
+from sandcrawler.html_metadata import (BiblioMetadata, html_extract_biblio,
+ html_extract_resources, load_adblock_rules)
+from sandcrawler.ia import (CdxApiClient, CdxApiError, NoCaptureError, PetaboxError,
+ ResourceResult, SavePageNowClient, SavePageNowError, WaybackClient,
+ WaybackContentError, WaybackError, cdx_to_dict,
fix_transfer_encoding)
from sandcrawler.ingest_file import IngestFileWorker
-from sandcrawler.ingest_html import (WebResource, fetch_html_resources, html_extract_body_teixml, html_guess_platform,
+from sandcrawler.ingest_html import (WebResource, fetch_html_resources,
+ html_extract_body_teixml, html_guess_platform,
html_guess_scope, quick_fetch_html_resources)
from sandcrawler.misc import clean_url, gen_file_metadata, parse_cdx_datetime
from sandcrawler.workers import SandcrawlerWorker
-MAX_BODY_SIZE_BYTES = 128*1024*1024
+MAX_BODY_SIZE_BYTES = 128 * 1024 * 1024
+
class IngestFilesetWorker(IngestFileWorker):
"""
@@ -39,14 +42,13 @@ class IngestFilesetWorker(IngestFileWorker):
checking to see if content has been archived already)
4. summarize status
"""
-
def __init__(self, sink=None, **kwargs):
super().__init__(sink=None, **kwargs)
self.sink = sink
self.dataset_platform_helpers = DATASET_PLATFORM_HELPER_TABLE
self.dataset_strategy_archivers = FILESET_STRATEGY_HELPER_TABLE
- self.max_total_size = kwargs.get('max_total_size', 64*1024*1024*1024)
+ self.max_total_size = kwargs.get('max_total_size', 64 * 1024 * 1024 * 1024)
self.max_file_count = kwargs.get('max_file_count', 200)
self.ingest_file_result_sink = kwargs.get('ingest_file_result_sink')
self.ingest_file_result_stdout = kwargs.get('ingest_file_result_stdout', False)
@@ -72,11 +74,12 @@ class IngestFilesetWorker(IngestFileWorker):
raise NotImplementedError("process_existing() not tested or safe yet")
def want(self, request: dict) -> bool:
- if not request.get('ingest_type') in ('dataset',):
+ if not request.get('ingest_type') in ('dataset', ):
return False
return True
- def fetch_resource_iteratively(self, ingest_type: str, base_url: str, force_recrawl: bool) -> dict:
+ def fetch_resource_iteratively(self, ingest_type: str, base_url: str,
+ force_recrawl: bool) -> dict:
"""
This is copypasta from process_file(), should probably refactor.
"""
@@ -174,10 +177,9 @@ class IngestFilesetWorker(IngestFileWorker):
# here we split based on ingest type to try and extract a next hop
html_ish_resource = bool(
"html" in file_meta['mimetype']
- or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml"
+ or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml"
or "application/xml" in file_meta['mimetype']
- or "text/xml" in file_meta['mimetype']
- )
+ or "text/xml" in file_meta['mimetype'])
html_biblio = None
html_doc = None
if html_ish_resource and resource.body:
@@ -186,7 +188,8 @@ class IngestFilesetWorker(IngestFileWorker):
html_biblio = html_extract_biblio(resource.terminal_url, html_doc)
if html_biblio:
if not 'html_biblio' in result or html_biblio.title:
- result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True))
+ result['html_biblio'] = json.loads(
+ html_biblio.json(exclude_none=True))
#print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr)
except ValueError:
pass
@@ -214,7 +217,8 @@ class IngestFilesetWorker(IngestFileWorker):
result['status'] = "wrong-mimetype" # formerly: "other-mimetype"
return result
elif ingest_type == "xml":
- if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"):
+ if file_meta['mimetype'] not in ("application/xml", "text/xml",
+ "application/jats+xml"):
result['status'] = "wrong-mimetype"
return result
elif ingest_type == "html":
@@ -229,11 +233,10 @@ class IngestFilesetWorker(IngestFileWorker):
result['_resource'] = resource
return result
-
def process(self, request: dict, key: Any = None) -> dict:
ingest_type = request.get('ingest_type')
- if ingest_type not in ("dataset",):
+ if ingest_type not in ("dataset", ):
raise NotImplementedError(f"can't handle ingest_type={ingest_type}")
# parse/clean URL
@@ -250,7 +253,9 @@ class IngestFilesetWorker(IngestFileWorker):
#if existing:
# return self.process_existing(request, existing)
- result = self.fetch_resource_iteratively(ingest_type, base_url, force_recrawl=force_recrawl)
+ result = self.fetch_resource_iteratively(ingest_type,
+ base_url,
+ force_recrawl=force_recrawl)
result['request'] = request
if result.get('status') != None:
result['request'] = request
@@ -323,14 +328,16 @@ class IngestFilesetWorker(IngestFileWorker):
return result
if result['file_count'] > self.max_file_count:
# hard max, to prevent downstream breakage
- if result['file_count'] > 10*1000:
+ if result['file_count'] > 10 * 1000:
result['manifest'] = result['manifest'][:self.max_file_count]
result['status'] = 'too-many-files'
return result
ingest_strategy = platform_helper.chose_strategy(dataset_meta)
result['ingest_strategy'] = ingest_strategy
- print(f"[PLATFORM {platform}] id={dataset_meta.platform_id} file_count={result['file_count']} total_size={result['total_size']} strategy={ingest_strategy}", file=sys.stderr)
+ print(
+ f"[PLATFORM {platform}] id={dataset_meta.platform_id} file_count={result['file_count']} total_size={result['total_size']} strategy={ingest_strategy}",
+ file=sys.stderr)
strategy_helper = self.dataset_strategy_archivers.get(ingest_strategy)
if not strategy_helper:
@@ -349,7 +356,8 @@ class IngestFilesetWorker(IngestFileWorker):
if archive_result.bundle_file_meta:
result['fileset_bundle']['file_meta'] = archive_result.bundle_file_meta
if archive_result.archiveorg_bundle_path:
- result['fileset_bundle']['archiveorg_bundle_path'] = archive_result.archiveorg_bundle_path
+ result['fileset_bundle'][
+ 'archiveorg_bundle_path'] = archive_result.archiveorg_bundle_path
if archive_result.bundle_resource:
result['fileset_bundle']['terminal'] = dict(
terminal_url=archive_result.bundle_resource.terminal_url,
@@ -357,14 +365,16 @@ class IngestFilesetWorker(IngestFileWorker):
terminal_status_code=archive_result.bundle_resource.terminal_status_code,
)
if archive_result.bundle_resource.cdx:
- result['fileset_bundle']['cdx'] = cdx_to_dict(archive_result.bundle_resource.cdx)
+ result['fileset_bundle']['cdx'] = cdx_to_dict(
+ archive_result.bundle_resource.cdx)
if archive_result.bundle_resource.revisit_cdx:
- result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(archive_result.bundle_resource.revisit_cdx)
+ result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(
+ archive_result.bundle_resource.revisit_cdx)
if ingest_strategy.endswith('-file'):
result['fileset_file'] = dict()
if archive_result.file_file_meta:
- result['fileset_file']['file_meta'] = file_meta=archive_result.file_file_meta,
+ result['fileset_file']['file_meta'] = file_meta = archive_result.file_file_meta,
if archive_result.file_resource:
result['fileset_file']['terminal'] = dict(
terminal_url=archive_result.file_resource.terminal_url,
@@ -372,16 +382,20 @@ class IngestFilesetWorker(IngestFileWorker):
terminal_status_code=archive_result.file_resource.terminal_status_code,
)
if archive_result.file_resource.cdx:
- result['fileset_file']['cdx'] = cdx_to_dict(archive_result.file_resource.cdx)
+ result['fileset_file']['cdx'] = cdx_to_dict(
+ archive_result.file_resource.cdx)
if archive_result.file_resource.revisit_cdx:
- result['fileset_file']['revisit_cdx'] = cdx_to_dict(archive_result.file_resource.revisit_cdx)
+ result['fileset_file']['revisit_cdx'] = cdx_to_dict(
+ archive_result.file_resource.revisit_cdx)
if result['status'].startswith('success'):
# check that these are still valid
assert result['file_count'] == len(archive_result.manifest)
- assert result['total_size'] == sum([m.size for m in archive_result.manifest if m.size])
+ assert result['total_size'] == sum(
+ [m.size for m in archive_result.manifest if m.size])
- if result['status'] == 'success-file' and archive_result.file_resource and archive_result.file_file_meta:
+ if result[
+ 'status'] == 'success-file' and archive_result.file_resource and archive_result.file_file_meta:
file_result = dict(
hit=True,
status='success',
@@ -397,10 +411,13 @@ class IngestFilesetWorker(IngestFileWorker):
if archive_result.file_resource.cdx:
file_result['cdx'] = cdx_to_dict(archive_result.file_resource.cdx)
if archive_result.file_resource.revisit_cdx:
- file_result['revisit_cdx'] = cdx_to_dict(archive_result.file_resource.revisit_cdx)
+ file_result['revisit_cdx'] = cdx_to_dict(
+ archive_result.file_resource.revisit_cdx)
file_result['request']['ingest_type'] = request['ingest_type'] + "-file"
# call the super() (ingest_file) version of process_hit()
- info = self.process_file_hit(file_result['request']['ingest_type'], archive_result.file_resource, archive_result.file_file_meta)
+ info = self.process_file_hit(file_result['request']['ingest_type'],
+ archive_result.file_resource,
+ archive_result.file_file_meta)
file_result.update(info)
if self.ingest_file_result_sink:
self.ingest_file_result_sink.push_record(result.copy())
@@ -410,17 +427,19 @@ class IngestFilesetWorker(IngestFileWorker):
if result['status'].startswith('success'):
result['hit'] = True
print("[SUCCESS {:>5}] file_count={} total_size={} strategy={}".format(
- ingest_type,
- result['file_count'],
- result['total_size'],
- ingest_strategy,
- ), file=sys.stderr)
+ ingest_type,
+ result['file_count'],
+ result['total_size'],
+ ingest_strategy,
+ ),
+ file=sys.stderr)
else:
print("[FAIL {:>5}] status={} file_count={} total_size={} strategy={}".format(
- ingest_type,
- result['status'],
- result['file_count'],
- result['total_size'],
- ingest_strategy,
- ), file=sys.stderr)
+ ingest_type,
+ result['status'],
+ result['file_count'],
+ result['total_size'],
+ ingest_strategy,
+ ),
+ file=sys.stderr)
return result
diff --git a/python/sandcrawler/ingest_html.py b/python/sandcrawler/ingest_html.py
index 9c72dd5..bf25d5d 100644
--- a/python/sandcrawler/ingest_html.py
+++ b/python/sandcrawler/ingest_html.py
@@ -1,4 +1,3 @@
-
import argparse
import datetime
import io
@@ -11,16 +10,20 @@ import pydantic
import trafilatura
from selectolax.parser import HTMLParser
-from sandcrawler.html_metadata import BiblioMetadata, html_extract_biblio, html_extract_resources, load_adblock_rules
-from sandcrawler.ia import (CdxApiClient, NoCaptureError, ResourceResult, WaybackClient, WaybackContentError,
- cdx_to_dict, fix_transfer_encoding)
-from sandcrawler.misc import clean_url, datetime_to_cdx, gen_file_metadata, parse_cdx_datetime, url_fuzzy_equal
+from sandcrawler.html_metadata import (BiblioMetadata, html_extract_biblio,
+ html_extract_resources, load_adblock_rules)
+from sandcrawler.ia import (CdxApiClient, NoCaptureError, ResourceResult, WaybackClient,
+ WaybackContentError, cdx_to_dict, fix_transfer_encoding)
+from sandcrawler.misc import (clean_url, datetime_to_cdx, gen_file_metadata, parse_cdx_datetime,
+ url_fuzzy_equal)
TRAFILATURA_AGENT = f"trafilatura/{trafilatura.__version__}"
+
def html_extract_body_teixml(doc: bytes) -> dict:
try:
- tei_xml = trafilatura.extract(doc,
+ tei_xml = trafilatura.extract(
+ doc,
output_format='xmltei',
include_comments=False,
include_formatting=True,
@@ -33,13 +36,19 @@ def html_extract_body_teixml(doc: bytes) -> dict:
if tei_xml:
body_txt = teixml_body_text(tei_xml)
word_count = len(body_txt.split())
- return dict(status="success", agent=TRAFILATURA_AGENT, tei_xml=tei_xml, word_count=word_count)
- elif doc.startswith(b'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 2012"http://www.w3.org/TR/html4/loose.dtd">'):
+ return dict(status="success",
+ agent=TRAFILATURA_AGENT,
+ tei_xml=tei_xml,
+ word_count=word_count)
+ elif doc.startswith(
+ b'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 2012"http://www.w3.org/TR/html4/loose.dtd">'
+ ):
# hack for firstmonday.org
return html_extract_body_teixml(doc[106:])
else:
return dict(status="empty-xml", agent=TRAFILATURA_AGENT)
+
def teixml_body_text(doc_xml: str) -> str:
ns = {"tei": "http://www.tei-c.org/ns/1.0"}
tree = ET.fromstring(doc_xml)
@@ -49,6 +58,7 @@ def teixml_body_text(doc_xml: str) -> str:
else:
return ""
+
class WebResource(pydantic.BaseModel):
surt: str
timestamp: datetime.datetime
@@ -61,16 +71,15 @@ class WebResource(pydantic.BaseModel):
resource_type: Optional[str]
class Config:
- json_encoders = {
- datetime.datetime: lambda dt: dt.isoformat()
- }
+ json_encoders = {datetime.datetime: lambda dt: dt.isoformat()}
+
class IngestWebResult(pydantic.BaseModel):
status: str
hit: bool
error_message: Optional[str]
cdx: Optional[dict]
- terminal: Optional[Any] # TODO
+ terminal: Optional[Any] # TODO
request: Optional[Any] # TODO
file_meta: Optional[dict]
html_biblio: Optional[BiblioMetadata]
@@ -84,6 +93,7 @@ class IngestWebResult(pydantic.BaseModel):
datetime.datetime: lambda dt: dt.isoformat(),
}
+
class HtmlMetaRow(pydantic.BaseModel):
sha1hex: str
status: str
@@ -106,7 +116,7 @@ class HtmlMetaRow(pydantic.BaseModel):
"""
return (
self.sha1hex,
- datetime.datetime.now(), # updated
+ datetime.datetime.now(), # updated
self.status,
self.scope,
self.has_teixml,
@@ -117,7 +127,8 @@ class HtmlMetaRow(pydantic.BaseModel):
)
-def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient, when: Optional[datetime.datetime]) -> List[WebResource]:
+def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient,
+ when: Optional[datetime.datetime]) -> List[WebResource]:
"""
This is the lazy version that just does a CDX lookup for each resource.
@@ -132,27 +143,30 @@ def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient,
if not cdx_row:
raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}")
if cdx_row.url != resource['url'] and not url_fuzzy_equal(cdx_row.url, resource['url']):
- print(f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}", file=sys.stderr)
+ print(f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}",
+ file=sys.stderr)
if not cdx_row.status_code:
# TODO: fall back to a full fetch?
print(f" WARN: skipping revisit record", file=sys.stderr)
continue
- full.append(WebResource(
- surt=cdx_row.surt,
- timestamp=cdx_row.datetime,
- url=cdx_row.url,
- sha1hex=cdx_row.sha1hex,
- mimetype=cdx_row.mimetype,
- status_code=cdx_row.status_code,
- size=None,
- sha256hex=None,
- resource_type=resource['type'],
- ))
+ full.append(
+ WebResource(
+ surt=cdx_row.surt,
+ timestamp=cdx_row.datetime,
+ url=cdx_row.url,
+ sha1hex=cdx_row.sha1hex,
+ mimetype=cdx_row.mimetype,
+ status_code=cdx_row.status_code,
+ size=None,
+ sha256hex=None,
+ resource_type=resource['type'],
+ ))
return full
-def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient, when: Optional[datetime.datetime]) -> List[WebResource]:
+def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient,
+ when: Optional[datetime.datetime]) -> List[WebResource]:
"""
This is the full version which fetches each resource from wayback/petabox
and calculates additional hashes.
@@ -168,23 +182,28 @@ def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient, w
raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}")
file_meta = gen_file_metadata(wayback_resp.body, allow_empty=True)
if file_meta['sha1hex'] != wayback_resp.cdx.sha1hex:
- raise WaybackContentError(f"wayback payload sha1hex mismatch: {wayback_resp.cdx.datetime} {wayback_resp.cdx.url}")
- full.append(WebResource(
- surt=wayback_resp.cdx.surt,
- timestamp=parse_cdx_datetime(wayback_resp.cdx.datetime),
- url=wayback_resp.cdx.url,
- sha1hex=file_meta['sha1hex'],
- mimetype=file_meta['mimetype'],
- status_code=wayback_resp.cdx.status_code or wayback_resp.revisit_cdx.status_code,
- size=file_meta['size_bytes'],
- sha256hex=file_meta['sha256hex'],
- resource_type=resource['type'],
- ))
+ raise WaybackContentError(
+ f"wayback payload sha1hex mismatch: {wayback_resp.cdx.datetime} {wayback_resp.cdx.url}"
+ )
+ full.append(
+ WebResource(
+ surt=wayback_resp.cdx.surt,
+ timestamp=parse_cdx_datetime(wayback_resp.cdx.datetime),
+ url=wayback_resp.cdx.url,
+ sha1hex=file_meta['sha1hex'],
+ mimetype=file_meta['mimetype'],
+ status_code=wayback_resp.cdx.status_code
+ or wayback_resp.revisit_cdx.status_code,
+ size=file_meta['size_bytes'],
+ sha256hex=file_meta['sha256hex'],
+ resource_type=resource['type'],
+ ))
return full
-def html_guess_platform(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata]) -> Optional[str]:
+def html_guess_platform(url: str, doc: HTMLParser,
+ biblio: Optional[BiblioMetadata]) -> Optional[str]:
generator: Optional[str] = None
generator_elem = doc.css_first("meta[name='generator']")
@@ -229,7 +248,9 @@ def html_guess_platform(url: str, doc: HTMLParser, biblio: Optional[BiblioMetada
return None
-def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata], word_count: Optional[int]) -> str:
+
+def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata],
+ word_count: Optional[int]) -> str:
"""
This function tries to guess if an HTML document represents one of:
@@ -328,7 +349,9 @@ def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata]
return "unknown"
-def run_single(url: str, timestamp: Optional[str] = None, quick_mode: bool = False) -> IngestWebResult:
+def run_single(url: str,
+ timestamp: Optional[str] = None,
+ quick_mode: bool = False) -> IngestWebResult:
adblock = load_adblock_rules()
wayback_client = WaybackClient()
@@ -375,7 +398,8 @@ def run_single(url: str, timestamp: Optional[str] = None, quick_mode: bool = Fal
full_resources: List[WebResource] = []
if quick_mode:
- full_resources = quick_fetch_html_resources(raw_resources, wayback_client.cdx_client, when)
+ full_resources = quick_fetch_html_resources(raw_resources, wayback_client.cdx_client,
+ when)
else:
full_resources = fetch_html_resources(raw_resources, wayback_client, when)
@@ -399,14 +423,11 @@ def main() -> None:
python -m sandcrawler.ingest_html
"""
- parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter
- )
+ parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
subparsers = parser.add_subparsers()
- sub = subparsers.add_parser(
- "single", help="tries to ingest a single URL, dumps result to stdout"
- )
+ sub = subparsers.add_parser("single",
+ help="tries to ingest a single URL, dumps result to stdout")
sub.set_defaults(func="run_single")
sub.add_argument(
"url",
@@ -437,5 +458,6 @@ def main() -> None:
#func()
raise NotImplementedError()
+
if __name__ == "__main__":
main()
diff --git a/python/sandcrawler/minio.py b/python/sandcrawler/minio.py
index b617178..188621f 100644
--- a/python/sandcrawler/minio.py
+++ b/python/sandcrawler/minio.py
@@ -1,4 +1,3 @@
-
import hashlib
import io
import os
@@ -7,7 +6,6 @@ import minio
class SandcrawlerMinioClient(object):
-
def __init__(self, host_url, access_key, secret_key, default_bucket=None):
"""
host is minio connection string (host:port)
diff --git a/python/sandcrawler/misc.py b/python/sandcrawler/misc.py
index cf8c4bd..ddbd95a 100644
--- a/python/sandcrawler/misc.py
+++ b/python/sandcrawler/misc.py
@@ -1,4 +1,3 @@
-
import base64
import datetime
import hashlib
@@ -19,22 +18,28 @@ def clean_url(s: str) -> str:
parsed.colon_before_port = b''
return str(urlcanon.whatwg(parsed))
+
def url_fuzzy_equal(left: str, right: str) -> bool:
"""
TODO: use proper surt library and canonicalization for this check
"""
- fuzzy_left = '://'.join(clean_url(left).replace('www.', '').replace(':80/', '/').split('://')[1:])
- fuzzy_right = '://'.join(clean_url(right).replace('www.', '').replace(':80/', '/').split('://')[1:])
+ fuzzy_left = '://'.join(
+ clean_url(left).replace('www.', '').replace(':80/', '/').split('://')[1:])
+ fuzzy_right = '://'.join(
+ clean_url(right).replace('www.', '').replace(':80/', '/').split('://')[1:])
if fuzzy_left == fuzzy_right:
return True
elif fuzzy_left == fuzzy_right + "/" or fuzzy_right == fuzzy_left + "/":
return True
return False
+
def test_url_fuzzy_equal() -> None:
assert True == url_fuzzy_equal(
"http://www.annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree",
- "http://annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree")
+ "http://annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree"
+ )
+
def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict:
"""
@@ -45,10 +50,10 @@ def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict:
assert blob is not None
if not allow_empty:
assert blob
- if len(blob) < 1024*1024:
+ if len(blob) < 1024 * 1024:
mimetype = magic.Magic(mime=True).from_buffer(blob)
else:
- mimetype = magic.Magic(mime=True).from_buffer(blob[:(1024*1024)])
+ mimetype = magic.Magic(mime=True).from_buffer(blob[:(1024 * 1024)])
if mimetype in ("application/xml", "text/xml"):
# crude checks for XHTML or JATS XML, using only first 1 kB of file
if b"<htm" in blob[:1024] and b'xmlns="http://www.w3.org/1999/xhtml"' in blob[:1024]:
@@ -70,6 +75,7 @@ def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict:
mimetype=mimetype,
)
+
def gen_file_metadata_path(path: str, allow_empty: bool = False) -> dict:
"""
Variant of gen_file_metadata() which works with files on local disk
@@ -92,7 +98,7 @@ def gen_file_metadata_path(path: str, allow_empty: bool = False) -> dict:
size_bytes = 0
with open(path, 'rb') as f:
while True:
- chunk = f.read(1024*1024)
+ chunk = f.read(1024 * 1024)
if not chunk:
break
size_bytes += len(chunk)
@@ -108,6 +114,7 @@ def gen_file_metadata_path(path: str, allow_empty: bool = False) -> dict:
mimetype=mimetype,
)
+
def b32_hex(s: str) -> str:
"""
Converts a base32-encoded SHA-1 checksum into hex-encoded
@@ -123,6 +130,7 @@ def b32_hex(s: str) -> str:
raise ValueError("not a base-32 encoded SHA-1 hash: {}".format(s))
return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8')
+
NORMAL_MIME = (
'application/pdf',
'application/postscript',
@@ -131,6 +139,7 @@ NORMAL_MIME = (
'application/octet-stream',
)
+
def normalize_mime(raw: str) -> Optional[str]:
raw = raw.lower().strip()
for norm in NORMAL_MIME:
@@ -142,9 +151,7 @@ def normalize_mime(raw: str) -> Optional[str]:
return 'text/xml'
if raw.startswith('application/x-pdf'):
return 'application/pdf'
- if raw in (
- '.pdf',
- ):
+ if raw in ('.pdf', ):
return 'application/pdf'
if raw in (
'application/download',
@@ -154,7 +161,7 @@ def normalize_mime(raw: str) -> Optional[str]:
'application/octetstream',
'application/force-download',
'application/unknown',
- ):
+ ):
return 'application/octet-stream'
return None
@@ -193,8 +200,8 @@ def parse_cdx_line(raw_cdx: str, normalize=True) -> Optional[dict]:
offset = cdx[9]
warc = cdx[10]
- if not (sha1b32.isalnum() and c_size.isdigit() and offset.isdigit()
- and len(sha1b32) == 32 and dt.isdigit()):
+ if not (sha1b32.isalnum() and c_size.isdigit() and offset.isdigit() and len(sha1b32) == 32
+ and dt.isdigit()):
return None
if '-' in (surt, dt, url, http_status, sha1b32, c_size, offset, warc):
@@ -221,6 +228,7 @@ def parse_cdx_line(raw_cdx: str, normalize=True) -> Optional[dict]:
warc_path=warc,
)
+
def parse_cdx_datetime(dt_str: str) -> Optional[datetime.datetime]:
if not dt_str:
return None
@@ -229,23 +237,39 @@ def parse_cdx_datetime(dt_str: str) -> Optional[datetime.datetime]:
except Exception:
return None
+
def test_parse_cdx_datetime() -> None:
assert parse_cdx_datetime("") == None
assert parse_cdx_datetime("asdf") == None
assert parse_cdx_datetime("19930203123045") != None
- assert parse_cdx_datetime("20201028235103") == datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3)
+ assert parse_cdx_datetime("20201028235103") == datetime.datetime(year=2020,
+ month=10,
+ day=28,
+ hour=23,
+ minute=51,
+ second=3)
+
def datetime_to_cdx(dt: datetime.datetime) -> str:
return '%04d%02d%02d%02d%02d%02d' % (
- dt.year, dt.month, dt.day,
- dt.hour, dt.minute, dt.second,
+ dt.year,
+ dt.month,
+ dt.day,
+ dt.hour,
+ dt.minute,
+ dt.second,
)
+
def test_datetime_to_cdx() -> None:
- assert "20201028235103" == datetime_to_cdx(datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3))
+ assert "20201028235103" == datetime_to_cdx(
+ datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3))
+
-def requests_retry_session(retries=10, backoff_factor=3,
- status_forcelist=(500, 502, 504), session=None) -> requests.Session:
+def requests_retry_session(retries=10,
+ backoff_factor=3,
+ status_forcelist=(500, 502, 504),
+ session=None) -> requests.Session:
"""
From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests
"""
@@ -262,6 +286,7 @@ def requests_retry_session(retries=10, backoff_factor=3,
session.mount('https://', adapter)
return session
+
def sanitize_fs_path(path: str) -> str:
"""
From: https://stackoverflow.com/questions/13939120/sanitizing-a-file-path-in-python/66950540#66950540
@@ -271,6 +296,7 @@ def sanitize_fs_path(path: str) -> str:
# - making the path relative
return os.path.relpath(os.path.normpath(os.path.join("/", path)), "/")
+
def test_sanitize_fs_path() -> None:
assert sanitize_fs_path("/thing.png") == "thing.png"
assert sanitize_fs_path("../../thing.png") == "thing.png"
diff --git a/python/sandcrawler/pdfextract.py b/python/sandcrawler/pdfextract.py
index 2fb34b8..190672d 100644
--- a/python/sandcrawler/pdfextract.py
+++ b/python/sandcrawler/pdfextract.py
@@ -1,4 +1,3 @@
-
import datetime
import json
import sys
@@ -153,19 +152,20 @@ BAD_PDF_SHA1HEX = [
"fd9bd560662e070b222d63052830837829c490f0",
]
+
@dataclass
class PdfExtractResult:
sha1hex: str
status: str
error_msg: Optional[str] = None
- file_meta: Optional[Dict[str,Any]] = None
+ file_meta: Optional[Dict[str, Any]] = None
text: Optional[str] = None
page0_thumbnail: Optional[bytes] = None
has_page0_thumbnail: bool = False
meta_xml: Optional[str] = None
- pdf_info: Optional[Dict[str,Any]] = None
- pdf_extra: Optional[Dict[str,Any]] = None
- source: Optional[Dict[str,Any]] = None
+ pdf_info: Optional[Dict[str, Any]] = None
+ pdf_extra: Optional[Dict[str, Any]] = None
+ source: Optional[Dict[str, Any]] = None
def to_pdftext_dict(self) -> dict:
"""
@@ -221,7 +221,8 @@ class PdfExtractResult:
)
else:
pdf_extra = dict()
- for k in ('page_count', 'page0_height', 'page0_width', 'permanent_id', 'pdf_version'):
+ for k in ('page_count', 'page0_height', 'page0_width', 'permanent_id',
+ 'pdf_version'):
if record.get(k):
pdf_extra[k] = record[k]
return PdfExtractResult(
@@ -255,7 +256,7 @@ class PdfExtractResult:
metadata_json = json.dumps(metadata, sort_keys=True)
return (
self.sha1hex,
- datetime.datetime.now(), # updated
+ datetime.datetime.now(), # updated
self.status,
self.has_page0_thumbnail,
pdf_extra.get('page_count'),
@@ -269,7 +270,7 @@ class PdfExtractResult:
)
-def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtractResult:
+def process_pdf(blob: bytes, thumb_size=(180, 300), thumb_type="JPEG") -> PdfExtractResult:
"""
A known issue is that output text is in "physical layout" mode, which means
columns will be side-by-side. We would prefer a single stream of tokens!
@@ -330,7 +331,8 @@ def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtr
renderer = poppler.PageRenderer()
try:
full_img = renderer.render_page(page0)
- img = Image.frombuffer("RGBA", (full_img.width, full_img.height), full_img.data, 'raw', "BGRA", 0, 1)
+ img = Image.frombuffer("RGBA", (full_img.width, full_img.height), full_img.data, 'raw',
+ "BGRA", 0, 1)
img.thumbnail(thumb_size, Image.BICUBIC)
buf = BytesIO()
img.save(buf, thumb_type)
@@ -356,14 +358,14 @@ def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtr
)
# Kafka message size limit; cap at about 1 MByte
- if len(full_text)> 1000000:
+ if len(full_text) > 1000000:
return PdfExtractResult(
sha1hex=sha1hex,
status='text-too-large',
error_msg="full_text chars: {}".format(len(full_text)),
file_meta=file_meta,
)
- if len(pdf.metadata)> 1000000:
+ if len(pdf.metadata) > 1000000:
return PdfExtractResult(
sha1hex=sha1hex,
status='text-too-large',
@@ -414,8 +416,8 @@ def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtr
),
)
-class PdfExtractWorker(SandcrawlerFetchWorker):
+class PdfExtractWorker(SandcrawlerFetchWorker):
def __init__(self, wayback_client=None, sink=None, **kwargs):
super().__init__(wayback_client=wayback_client)
self.wayback_client = wayback_client
@@ -445,12 +447,12 @@ class PdfExtractWorker(SandcrawlerFetchWorker):
self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex)
return result.to_pdftext_dict()
+
class PdfExtractBlobWorker(SandcrawlerWorker):
"""
This is sort of like PdfExtractWorker, except it receives blobs directly,
instead of fetching blobs from some remote store.
"""
-
def __init__(self, sink=None, **kwargs):
super().__init__()
self.sink = sink
@@ -466,4 +468,3 @@ class PdfExtractBlobWorker(SandcrawlerWorker):
self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex)
return result.to_pdftext_dict()
-
diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py
index 7d03357..e3d4a54 100644
--- a/python/sandcrawler/pdftrio.py
+++ b/python/sandcrawler/pdftrio.py
@@ -1,4 +1,3 @@
-
import time
import requests
@@ -8,7 +7,6 @@ from .workers import SandcrawlerFetchWorker, SandcrawlerWorker
class PdfTrioClient(object):
-
def __init__(self, host_url="http://pdftrio.qa.fatcat.wiki", **kwargs):
self.host_url = host_url
self.http_session = requests_retry_session(retries=3, backoff_factor=3)
@@ -51,9 +49,7 @@ class PdfTrioClient(object):
'error_msg': 'pdftrio request connection timout',
}
- info = dict(
- status_code=pdftrio_response.status_code,
- )
+ info = dict(status_code=pdftrio_response.status_code, )
if pdftrio_response.status_code == 200:
resp_json = pdftrio_response.json()
assert 'ensemble_score' in resp_json
@@ -72,7 +68,6 @@ class PdfTrioWorker(SandcrawlerFetchWorker):
"""
This class is basically copied directly from GrobidWorker
"""
-
def __init__(self, pdftrio_client, wayback_client=None, sink=None, **kwargs):
super().__init__(wayback_client=wayback_client)
self.pdftrio_client = pdftrio_client
@@ -103,12 +98,12 @@ class PdfTrioWorker(SandcrawlerFetchWorker):
result['timing']['fetch_sec'] = fetch_sec
return result
+
class PdfTrioBlobWorker(SandcrawlerWorker):
"""
This is sort of like PdfTrioWorker, except it receives blobs directly,
instead of fetching blobs from some remote store.
"""
-
def __init__(self, pdftrio_client, sink=None, mode="auto", **kwargs):
super().__init__()
self.pdftrio_client = pdftrio_client
@@ -128,4 +123,3 @@ class PdfTrioBlobWorker(SandcrawlerWorker):
total_sec=time.time() - start_process,
)
return result
-
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 66a36bc..44c03f2 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -1,4 +1,3 @@
-
"""
cdx
- read raw CDX, filter
@@ -32,7 +31,6 @@ from sandcrawler.workers import SandcrawlerWorker
class PersistCdxWorker(SandcrawlerWorker):
-
def __init__(self, db_url, **kwargs):
super().__init__()
self.db = SandcrawlerPostgresClient(db_url)
@@ -56,8 +54,8 @@ class PersistCdxWorker(SandcrawlerWorker):
self.db.commit()
return []
-class PersistIngestFileResultWorker(SandcrawlerWorker):
+class PersistIngestFileResultWorker(SandcrawlerWorker):
def __init__(self, db_url, **kwargs):
super().__init__()
self.db = SandcrawlerPostgresClient(db_url)
@@ -78,8 +76,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
# backwards compat hacks; transform request to look like current schema
if raw.get('ingest_type') == 'file':
raw['ingest_type'] = 'pdf'
- if (not raw.get('link_source')
- and raw.get('base_url')
+ if (not raw.get('link_source') and raw.get('base_url')
and raw.get('ext_ids', {}).get('doi')
and raw['base_url'] == "https://doi.org/{}".format(raw['ext_ids']['doi'])):
# set link_source(_id) for old ingest requests
@@ -119,7 +116,6 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
if not request['request']:
request['request'] = None
return request
-
def file_result_to_row(self, raw: dict) -> Optional[dict]:
"""
@@ -137,7 +133,8 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
ingest_type = raw['request'].get('ingest_type')
if ingest_type == 'file':
ingest_type = 'pdf'
- if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset', 'dataset-file'):
+ if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset',
+ 'dataset-file'):
self.counts['skip-ingest-type'] += 1
return None
if raw['status'] in ("existing", ):
@@ -153,7 +150,9 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
if terminal:
result['terminal_url'] = terminal.get('terminal_url') or terminal.get('url')
result['terminal_dt'] = terminal.get('terminal_dt')
- result['terminal_status_code'] = terminal.get('terminal_status_code') or terminal.get('status_code') or terminal.get('http_code')
+ result['terminal_status_code'] = terminal.get(
+ 'terminal_status_code') or terminal.get('status_code') or terminal.get(
+ 'http_code')
if result['terminal_status_code']:
result['terminal_status_code'] = int(result['terminal_status_code'])
result['terminal_sha1hex'] = terminal.get('terminal_sha1hex')
@@ -215,9 +214,12 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
'manifest': raw.get('manifest'),
}
if result.get('fileset_bundle'):
- result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get('archiveorg_item_bundle_path')
- result['web_bundle_url'] = result['fileset_bundle'].get('terminal', {}).get('terminal_url')
- result['web_bundle_dt'] = result['fileset_bundle'].get('terminal', {}).get('terminal_dt')
+ result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get(
+ 'archiveorg_item_bundle_path')
+ result['web_bundle_url'] = result['fileset_bundle'].get('terminal',
+ {}).get('terminal_url')
+ result['web_bundle_dt'] = result['fileset_bundle'].get('terminal',
+ {}).get('terminal_dt')
return result
def push_batch(self, batch):
@@ -243,7 +245,9 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
# these schemas match, so can just pass through
cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx')]
- revisit_cdx_batch = [r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx')]
+ revisit_cdx_batch = [
+ r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx')
+ ]
cdx_batch.extend(revisit_cdx_batch)
# filter to full CDX lines, with full warc_paths (not liveweb)
cdx_batch = [r for r in cdx_batch if r.get('warc_path') and ("/" in r['warc_path'])]
@@ -258,24 +262,31 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
self.counts['insert-file_meta'] += resp[0]
self.counts['update-file_meta'] += resp[1]
- html_meta_batch = [self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body')]
+ html_meta_batch = [
+ self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body')
+ ]
if html_meta_batch:
resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="update")
self.counts['insert-html_meta'] += resp[0]
self.counts['update-html_meta'] += resp[1]
- fileset_platform_batch = [self.result_to_platform_row(raw) for raw in batch if raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name')]
+ fileset_platform_batch = [
+ self.result_to_platform_row(raw) for raw in batch if
+ raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name')
+ ]
fileset_platform_batch = [p for p in fileset_platform_batch if p]
if fileset_platform_batch:
- resp = self.db.insert_ingest_fileset_platform(self.cur, fileset_platform_batch, on_conflict="update")
+ resp = self.db.insert_ingest_fileset_platform(self.cur,
+ fileset_platform_batch,
+ on_conflict="update")
self.counts['insert-fileset_platform'] += resp[0]
self.counts['update-fileset_platform'] += resp[1]
self.db.commit()
return []
-class PersistIngestFilesetWorker(SandcrawlerWorker):
+class PersistIngestFilesetWorker(SandcrawlerWorker):
def __init__(self, db_url, **kwargs):
super().__init__()
self.db = SandcrawlerPostgresClient(db_url)
@@ -287,8 +298,8 @@ class PersistIngestFilesetWorker(SandcrawlerWorker):
"""
raise NotImplementedError
-class PersistIngestRequestWorker(PersistIngestFileResultWorker):
+class PersistIngestRequestWorker(PersistIngestFileResultWorker):
def __init__(self, db_url, **kwargs):
super().__init__(db_url=db_url)
@@ -315,8 +326,8 @@ class PersistIngestRequestWorker(PersistIngestFileResultWorker):
self.db.commit()
return []
-class PersistGrobidWorker(SandcrawlerWorker):
+class PersistGrobidWorker(SandcrawlerWorker):
def __init__(self, db_url, **kwargs):
super().__init__()
self.grobid = GrobidClient()
@@ -406,7 +417,6 @@ class PersistGrobidDiskWorker(SandcrawlerWorker):
This could be refactored into a "Sink" type with an even thinner wrapper.
"""
-
def __init__(self, output_dir):
super().__init__()
self.output_dir = output_dir
@@ -424,7 +434,7 @@ class PersistGrobidDiskWorker(SandcrawlerWorker):
if record.get('status_code') != 200 or not record.get('tei_xml'):
return False
- assert(len(record['key'])) == 40
+ assert (len(record['key'])) == 40
p = "{}/{}".format(self.output_dir, self._blob_path(record['key']))
os.makedirs(os.path.dirname(p), exist_ok=True)
with open(p, 'w') as f:
@@ -434,7 +444,6 @@ class PersistGrobidDiskWorker(SandcrawlerWorker):
class PersistPdfTrioWorker(SandcrawlerWorker):
-
def __init__(self, db_url, **kwargs):
super().__init__()
self.db = SandcrawlerPostgresClient(db_url)
@@ -458,7 +467,10 @@ class PersistPdfTrioWorker(SandcrawlerWorker):
self.counts['insert-pdftrio'] += resp[0]
self.counts['update-pdftrio'] += resp[1]
- file_meta_batch = [r['file_meta'] for r in batch if r['pdf_trio']['status'] == "success" and r.get('file_meta')]
+ file_meta_batch = [
+ r['file_meta'] for r in batch
+ if r['pdf_trio']['status'] == "success" and r.get('file_meta')
+ ]
resp = self.db.insert_file_meta(self.cur, file_meta_batch)
self.counts['insert-file-meta'] += resp[0]
self.counts['update-file-meta'] += resp[1]
@@ -473,7 +485,6 @@ class PersistPdfTextWorker(SandcrawlerWorker):
Should keep batch sizes small.
"""
-
def __init__(self, db_url, **kwargs):
super().__init__()
self.s3 = SandcrawlerMinioClient(
@@ -545,7 +556,6 @@ class PersistThumbnailWorker(SandcrawlerWorker):
This worker *must* be used with raw kakfa mode; thumbnails are *not*
wrapped in JSON like most sandcrawler kafka messages.
"""
-
def __init__(self, **kwargs):
super().__init__()
self.s3 = SandcrawlerMinioClient(
@@ -583,7 +593,6 @@ class GenericPersistDocWorker(SandcrawlerWorker):
Objects are assumed to be JSON-wrapped strings.
"""
-
def __init__(self, **kwargs):
super().__init__()
self.s3 = SandcrawlerMinioClient(
@@ -624,7 +633,6 @@ class PersistXmlDocWorker(GenericPersistDocWorker):
Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to
sandcrawler database (SQL).
"""
-
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.s3_extension = kwargs.get('s3_extension', ".jats.xml")
@@ -637,7 +645,6 @@ class PersistHtmlTeiXmlWorker(GenericPersistDocWorker):
Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to
sandcrawler database (SQL).
"""
-
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.s3_extension = kwargs.get('s3_extension', ".tei.xml")
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index d8a4016..7135f4c 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -1,4 +1,3 @@
-
import json
import multiprocessing.pool
import signal
@@ -21,7 +20,6 @@ class SandcrawlerWorker(object):
Usually these get "pushed" into by a RecordPusher. Output goes to another
worker (pipeline-style), or defaults to stdout.
"""
-
def __init__(self):
self.counts = Counter()
self.sink = None
@@ -62,9 +60,9 @@ class SandcrawlerWorker(object):
multithreading or if signal-based timeouts are used elsewhere in the
same process.
"""
-
def timeout_handler(signum, frame):
raise TimeoutError("timeout processing record")
+
signal.signal(signal.SIGALRM, timeout_handler)
resp = None
signal.alarm(int(timeout))
@@ -72,7 +70,7 @@ class SandcrawlerWorker(object):
resp = self.push_record(task, key=key)
except TimeoutError:
self.counts['timeout'] += 1
- resp = self.timeout_response(task) # pylint: disable=assignment-from-none
+ resp = self.timeout_response(task) # pylint: disable=assignment-from-none
# TODO: what if it is this push_record() itself that is timing out?
if resp and self.sink:
self.sink.push_record(resp)
@@ -113,7 +111,6 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg,
PDFs) from wayback, archive.org, or other sources.
"""
-
def __init__(self, wayback_client, **kwargs):
super().__init__(**kwargs)
self.wayback_client = wayback_client
@@ -178,7 +175,8 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
)
blob = resp.content
else:
- raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
+ raise ValueError(
+ "not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed")
if not blob:
return dict(
key=default_key,
@@ -192,8 +190,8 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
blob=blob,
)
-class MultiprocessWrapper(SandcrawlerWorker):
+class MultiprocessWrapper(SandcrawlerWorker):
def __init__(self, worker, sink, jobs=None):
self.counts = Counter()
self.worker = worker
@@ -226,21 +224,21 @@ class MultiprocessWrapper(SandcrawlerWorker):
print("Multiprocessing: {}".format(self.counts), file=sys.stderr)
return worker_counts
+
class BlackholeSink(SandcrawlerWorker):
"""
Dummy SandcrawlerWorker. That doesn't do or process anything.
Useful for tests.
"""
-
def push_record(self, task, key=None):
return
def push_batch(self, tasks):
return
-class KafkaSink(SandcrawlerWorker):
+class KafkaSink(SandcrawlerWorker):
def __init__(self, kafka_hosts, produce_topic, **kwargs):
self.sink = None
self.counts = Counter()
@@ -249,13 +247,12 @@ class KafkaSink(SandcrawlerWorker):
config = self.producer_config({
'bootstrap.servers': kafka_hosts,
- 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes
+ 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes
'api.version.request': True,
'api.version.fallback.ms': 0,
})
self.producer = Producer(config)
-
@staticmethod
def _fail_fast(err, msg):
if err is not None:
@@ -270,7 +267,7 @@ class KafkaSink(SandcrawlerWorker):
'delivery.report.only.error': True,
'default.topic.config': {
'message.timeout.ms': 30000,
- 'request.required.acks': -1, # all brokers must confirm
+ 'request.required.acks': -1, # all brokers must confirm
}
})
return config
@@ -285,11 +282,7 @@ class KafkaSink(SandcrawlerWorker):
msg = msg.encode('utf-8')
assert type(msg) == bytes
- self.producer.produce(
- self.produce_topic,
- msg,
- key=key,
- on_delivery=self._fail_fast)
+ self.producer.produce(self.produce_topic, msg, key=key, on_delivery=self._fail_fast)
self.counts['produced'] += 1
# check for errors etc
@@ -308,7 +301,6 @@ class KafkaCompressSink(KafkaSink):
"""
Variant of KafkaSink for large documents. Used for, eg, GROBID output.
"""
-
def producer_config(self, kafka_config):
config = kafka_config.copy()
config.update({
@@ -319,7 +311,7 @@ class KafkaCompressSink(KafkaSink):
'delivery.report.only.error': True,
'default.topic.config': {
'message.timeout.ms': 30000,
- 'request.required.acks': -1, # all brokers must confirm
+ 'request.required.acks': -1, # all brokers must confirm
}
})
return config
@@ -330,7 +322,6 @@ class RecordPusher:
Base class for different record sources to be pushed into workers. Pretty
trivial interface, just wraps an importer and pushes records in to it.
"""
-
def __init__(self, worker, **kwargs):
self.counts = Counter()
self.worker = worker
@@ -348,7 +339,6 @@ class RecordPusher:
class JsonLinePusher(RecordPusher):
-
def __init__(self, worker, json_file, **kwargs):
self.counts = Counter()
self.worker = worker
@@ -387,7 +377,6 @@ class JsonLinePusher(RecordPusher):
class CdxLinePusher(RecordPusher):
-
def __init__(self, worker, cdx_file, **kwargs):
self.counts = Counter()
self.worker = worker
@@ -409,7 +398,8 @@ class CdxLinePusher(RecordPusher):
if not record:
self.counts['skip-parse'] += 1
continue
- if self.filter_http_statuses and record['http_status'] not in self.filter_http_statuses:
+ if self.filter_http_statuses and record[
+ 'http_status'] not in self.filter_http_statuses:
self.counts['skip-http_status'] += 1
continue
if self.filter_mimetypes and record['mimetype'] not in self.filter_mimetypes:
@@ -434,7 +424,6 @@ class CdxLinePusher(RecordPusher):
class ZipfilePusher(RecordPusher):
-
def __init__(self, worker, zipfile_path, **kwargs):
self.counts = Counter()
self.worker = worker
@@ -472,8 +461,8 @@ class ZipfilePusher(RecordPusher):
print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
-class KafkaJsonPusher(RecordPusher):
+class KafkaJsonPusher(RecordPusher):
def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs):
self.counts = Counter()
self.worker = worker
@@ -499,12 +488,11 @@ class KafkaJsonPusher(RecordPusher):
# case where there there is one update and thousands of creates;
# update would be lingering in worker, and if worker crashed
# never created. Not great.
- batch = self.consumer.consume(
- num_messages=self.batch_size,
- timeout=self.poll_interval)
+ batch = self.consumer.consume(num_messages=self.batch_size,
+ timeout=self.poll_interval)
print("... got {} kafka messages ({}sec poll interval)".format(
- len(batch), self.poll_interval),
- file=sys.stderr)
+ len(batch), self.poll_interval),
+ file=sys.stderr)
if not batch:
# TODO: could have some larger timeout here and
# self.worker.finish() if it's been more than, eg, a couple
@@ -541,7 +529,9 @@ class KafkaJsonPusher(RecordPusher):
while not done:
try:
# use timeouts; don't want kafka itself to timeout
- self.worker.push_record_timeout(record, key=msg.key(), timeout=self.process_timeout_sec)
+ self.worker.push_record_timeout(record,
+ key=msg.key(),
+ timeout=self.process_timeout_sec)
break
except SandcrawlerBackoffError as be:
print("Backing off for 200 seconds: {}".format(be))
@@ -611,14 +601,14 @@ def make_kafka_consumer(hosts, consume_topic, group):
for p in partitions:
if p.error:
raise KafkaException(p.error)
- print("Kafka partitions rebalanced: {} / {}".format(
- consumer, partitions),
- file=sys.stderr)
+ print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions),
+ file=sys.stderr)
consumer = Consumer(conf)
# NOTE: it's actually important that topic_name *not* be bytes (UTF-8
# encoded)
- consumer.subscribe([topic_name],
+ consumer.subscribe(
+ [topic_name],
on_assign=on_rebalance,
on_revoke=on_rebalance,
)
diff --git a/python/sandcrawler/xml.py b/python/sandcrawler/xml.py
index 7a0086d..83d53d4 100644
--- a/python/sandcrawler/xml.py
+++ b/python/sandcrawler/xml.py
@@ -1,4 +1,3 @@
-
import xml.etree.ElementTree as ET