From 05bd7cbcc62588e431c5efd533189e246b2a997e Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 26 Oct 2021 12:54:37 -0700 Subject: make fmt --- python/sandcrawler/__init__.py | 18 +-- python/sandcrawler/db.py | 150 +++++++++++------------- python/sandcrawler/fileset_platforms.py | 169 ++++++++++++++++----------- python/sandcrawler/fileset_strategies.py | 68 ++++++----- python/sandcrawler/fileset_types.py | 6 +- python/sandcrawler/grobid.py | 30 ++--- python/sandcrawler/html.py | 49 ++++---- python/sandcrawler/html_metadata.py | 37 +++--- python/sandcrawler/ia.py | 192 ++++++++++++++++++++----------- python/sandcrawler/ingest_file.py | 145 ++++++++++++----------- python/sandcrawler/ingest_fileset.py | 101 +++++++++------- python/sandcrawler/ingest_html.py | 120 +++++++++++-------- python/sandcrawler/minio.py | 2 - python/sandcrawler/misc.py | 64 ++++++++--- python/sandcrawler/pdfextract.py | 29 ++--- python/sandcrawler/pdftrio.py | 10 +- python/sandcrawler/persist.py | 61 +++++----- python/sandcrawler/workers.py | 60 ++++------ python/sandcrawler/xml.py | 1 - 19 files changed, 741 insertions(+), 571 deletions(-) (limited to 'python/sandcrawler') diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py index bf2d92d..46735eb 100644 --- a/python/sandcrawler/__init__.py +++ b/python/sandcrawler/__init__.py @@ -1,14 +1,16 @@ - from .db import SandcrawlerPostgresClient, SandcrawlerPostgrestClient from .grobid import GrobidBlobWorker, GrobidClient, GrobidWorker -from .ia import (CdxApiClient, CdxApiError, CdxPartial, CdxRow, PetaboxError, ResourceResult, SavePageNowClient, - SavePageNowError, WarcResource, WaybackClient, WaybackContentError, WaybackError) +from .ia import (CdxApiClient, CdxApiError, CdxPartial, CdxRow, PetaboxError, ResourceResult, + SavePageNowClient, SavePageNowError, WarcResource, WaybackClient, + WaybackContentError, WaybackError) from .ingest_file import IngestFileWorker from .ingest_fileset import IngestFilesetWorker -from .misc import b32_hex, clean_url, gen_file_metadata, gen_file_metadata_path, parse_cdx_datetime, parse_cdx_line +from .misc import (b32_hex, clean_url, gen_file_metadata, gen_file_metadata_path, + parse_cdx_datetime, parse_cdx_line) from .pdfextract import PdfExtractBlobWorker, PdfExtractWorker from .pdftrio import PdfTrioBlobWorker, PdfTrioClient, PdfTrioWorker -from .persist import (PersistCdxWorker, PersistGrobidDiskWorker, PersistGrobidWorker, PersistIngestFileResultWorker, - PersistIngestRequestWorker, PersistPdfTextWorker, PersistPdfTrioWorker, PersistThumbnailWorker) -from .workers import (BlackholeSink, CdxLinePusher, JsonLinePusher, KafkaCompressSink, KafkaJsonPusher, KafkaSink, - MultiprocessWrapper, ZipfilePusher) +from .persist import (PersistCdxWorker, PersistGrobidDiskWorker, PersistGrobidWorker, + PersistIngestFileResultWorker, PersistIngestRequestWorker, + PersistPdfTextWorker, PersistPdfTrioWorker, PersistThumbnailWorker) +from .workers import (BlackholeSink, CdxLinePusher, JsonLinePusher, KafkaCompressSink, + KafkaJsonPusher, KafkaSink, MultiprocessWrapper, ZipfilePusher) diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py index 4dcdb0e..360add9 100644 --- a/python/sandcrawler/db.py +++ b/python/sandcrawler/db.py @@ -1,4 +1,3 @@ - import datetime import json from typing import Optional @@ -9,17 +8,16 @@ import requests class SandcrawlerPostgrestClient: - def __init__(self, api_url="http://wbgrp-svc506.us.archive.org:3030", **kwargs): self.api_url = api_url def get_cdx(self, url): - resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.'+url)) + resp = requests.get(self.api_url + "/cdx", params=dict(url='eq.' + url)) resp.raise_for_status() return resp.json() or None def get_grobid(self, sha1): - resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.'+sha1)) + resp = requests.get(self.api_url + "/grobid", params=dict(sha1hex='eq.' + sha1)) resp.raise_for_status() resp = resp.json() if resp: @@ -28,7 +26,7 @@ class SandcrawlerPostgrestClient: return None def get_pdftrio(self, sha1): - resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.'+sha1)) + resp = requests.get(self.api_url + "/pdftrio", params=dict(sha1hex='eq.' + sha1)) resp.raise_for_status() resp = resp.json() if resp: @@ -37,7 +35,7 @@ class SandcrawlerPostgrestClient: return None def get_pdf_meta(self, sha1): - resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.'+sha1)) + resp = requests.get(self.api_url + "/pdf_meta", params=dict(sha1hex='eq.' + sha1)) resp.raise_for_status() resp = resp.json() if resp: @@ -58,7 +56,7 @@ class SandcrawlerPostgrestClient: return None def get_file_meta(self, sha1): - resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.'+sha1)) + resp = requests.get(self.api_url + "/file_meta", params=dict(sha1hex='eq.' + sha1)) resp.raise_for_status() resp = resp.json() if resp: @@ -91,7 +89,7 @@ class SandcrawlerPostgrestClient: return None def get_crossref(self, doi): - resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.'+doi)) + resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.' + doi)) resp.raise_for_status() resp = resp.json() if resp: @@ -99,8 +97,8 @@ class SandcrawlerPostgrestClient: else: return None -class SandcrawlerPostgresClient: +class SandcrawlerPostgresClient: def __init__(self, db_url, **kwargs): self.conn = psycopg2.connect(db_url) @@ -135,14 +133,8 @@ class SandcrawlerPostgresClient: batch = [d for d in batch if d.get('warc_path')] if not batch: return (0, 0) - batch = [(d['url'], - d['datetime'], - d['sha1hex'], - d['mimetype'], - d['warc_path'], - int(d['warc_csize']), - int(d['warc_offset'])) - for d in batch] + batch = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'], d['warc_path'], + int(d['warc_csize']), int(d['warc_offset'])) for d in batch] # filter out duplicate rows by key (url, datetime) batch_dict = dict() for b in batch: @@ -170,12 +162,8 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [(d['sha1hex'], - d['sha256hex'], - d['md5hex'], - int(d['size_bytes']), - d['mimetype']) - for d in batch] + batch = [(d['sha1hex'], d['sha256hex'], d['md5hex'], int(d['size_bytes']), + d['mimetype']) for d in batch] # filter out duplicate rows by key (sha1hex) batch_dict = dict() for b in batch: @@ -215,15 +203,15 @@ class SandcrawlerPostgresClient: r[k] = r['metadata'].get(k) r['metadata'].pop(k, None) r['metadata'] = json.dumps(r['metadata'], sort_keys=True) - batch = [(d['key'], - d.get('grobid_version') or None, - d['status_code'], - d['status'], - d.get('fatcat_release') or None, - d.get('updated') or datetime.datetime.now(), - d.get('metadata') or None , - ) - for d in batch] + batch = [( + d['key'], + d.get('grobid_version') or None, + d['status_code'], + d['status'], + d.get('fatcat_release') or None, + d.get('updated') or datetime.datetime.now(), + d.get('metadata') or None, + ) for d in batch] # filter out duplicate rows by key (sha1hex) batch_dict = dict() for b in batch: @@ -331,20 +319,18 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [ - ( - d['key'], - d.get('updated') or datetime.datetime.now(), - d['status_code'], - d['status'], - d.get('versions', {}).get('pdftrio_version') or None, - d.get('versions', {}).get('models_date') or None, - d.get('ensemble_score'), - d.get('bert_score'), - d.get('linear_score'), - d.get('image_score'), - ) - for d in batch] + batch = [( + d['key'], + d.get('updated') or datetime.datetime.now(), + d['status_code'], + d['status'], + d.get('versions', {}).get('pdftrio_version') or None, + d.get('versions', {}).get('models_date') or None, + d.get('ensemble_score'), + d.get('bert_score'), + d.get('linear_score'), + d.get('image_score'), + ) for d in batch] # filter out duplicate rows by key (sha1hex) batch_dict = dict() for b in batch: @@ -373,15 +359,15 @@ class SandcrawlerPostgresClient: extra[k] = r[k] if extra: r['extra'] = json.dumps(extra, sort_keys=True) - batch = [(d['link_source'], - d['link_source_id'], - d['ingest_type'], - d['base_url'], - d.get('ingest_request_source'), - d.get('release_stage') or None, - d.get('extra') or None, - ) - for d in batch] + batch = [( + d['link_source'], + d['link_source_id'], + d['ingest_type'], + d['base_url'], + d.get('ingest_request_source'), + d.get('release_stage') or None, + d.get('extra') or None, + ) for d in batch] # filter out duplicate rows by key (link_source, link_source_id, ingest_type, base_url) batch_dict = dict() for b in batch: @@ -412,16 +398,16 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [(d['ingest_type'], - d['base_url'], - bool(d['hit']), - d['status'], - d.get('terminal_url'), - d.get('terminal_dt'), - d.get('terminal_status_code'), - d.get('terminal_sha1hex'), - ) - for d in batch] + batch = [( + d['ingest_type'], + d['base_url'], + bool(d['hit']), + d['status'], + d.get('terminal_url'), + d.get('terminal_dt'), + d.get('terminal_status_code'), + d.get('terminal_sha1hex'), + ) for d in batch] # filter out duplicate rows by key (ingest_type, base_url) batch_dict = dict() for b in batch: @@ -459,23 +445,23 @@ class SandcrawlerPostgresClient: else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) sql += " RETURNING xmax;" - batch = [(d['ingest_type'], - d['base_url'], - bool(d['hit']), - d['status'], - d.get('platform_name'), - d.get('platform_domain'), - d.get('platform_id'), - d.get('ingest_strategy'), - d.get('total_size'), - d.get('file_count'), - d.get('archiveorg_item_name'), - d.get('archiveorg_item_bundle_path'), - d.get('web_bundle_url'), - d.get('web_bundle_dt'), - d.get('manifest'), - ) - for d in batch] + batch = [( + d['ingest_type'], + d['base_url'], + bool(d['hit']), + d['status'], + d.get('platform_name'), + d.get('platform_domain'), + d.get('platform_id'), + d.get('ingest_strategy'), + d.get('total_size'), + d.get('file_count'), + d.get('archiveorg_item_name'), + d.get('archiveorg_item_bundle_path'), + d.get('web_bundle_url'), + d.get('web_bundle_dt'), + d.get('manifest'), + ) for d in batch] # filter out duplicate rows by key (ingest_type, base_url) batch_dict = dict() for b in batch: diff --git a/python/sandcrawler/fileset_platforms.py b/python/sandcrawler/fileset_platforms.py index 92fed37..f3441c9 100644 --- a/python/sandcrawler/fileset_platforms.py +++ b/python/sandcrawler/fileset_platforms.py @@ -1,4 +1,3 @@ - import gzip import json import sys @@ -16,17 +15,18 @@ from sandcrawler.ia import ResourceResult class FilesetPlatformHelper(): - def __init__(self): self.platform_name = 'unknown' - def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool: + def match_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> bool: """ Does this request look like it matches this platform? """ raise NotImplementedError() - def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: + def process_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) """ @@ -40,19 +40,18 @@ class FilesetPlatformHelper(): # XXX: while developing ArchiveorgFileset path #return IngestStrategy.ArchiveorgFileset if len(item.manifest) == 1: - if total_size < 64*1024*1024: + if total_size < 64 * 1024 * 1024: return IngestStrategy.WebFile else: return IngestStrategy.ArchiveorgFile else: - if largest_size < 64*1024*1024 and total_size < 128*1024*1024*1024: + if largest_size < 64 * 1024 * 1024 and total_size < 128 * 1024 * 1024 * 1024: return IngestStrategy.WebFileset else: return IngestStrategy.ArchiveorgFileset class DataverseHelper(FilesetPlatformHelper): - def __init__(self): super().__init__() self.platform_name = 'dataverse' @@ -122,8 +121,8 @@ class DataverseHelper(FilesetPlatformHelper): "file_id": file_id, } - - def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool: + def match_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> bool: if resource and resource.terminal_url: url = resource.terminal_url else: @@ -146,7 +145,8 @@ class DataverseHelper(FilesetPlatformHelper): return True - def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: + def process_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) @@ -179,22 +179,27 @@ class DataverseHelper(FilesetPlatformHelper): if parsed_id['file_id']: # XXX: maybe we could support this? - raise PlatformScopeError(f"only entire dataverse datasets can be archived with this tool") + raise PlatformScopeError( + f"only entire dataverse datasets can be archived with this tool") # 1b. if we didn't get a version number from URL, fetch it from API if not dataset_version: - resp = self.session.get(f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}") + resp = self.session.get( + f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}" + ) resp.raise_for_status() obj = resp.json() obj_latest = obj['data']['latestVersion'] dataset_version = f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}" # 2. API fetch - resp = self.session.get(f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}&version={dataset_version}") + resp = self.session.get( + f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}&version={dataset_version}" + ) resp.raise_for_status() obj = resp.json() - obj_latest= obj['data']['latestVersion'] + obj_latest = obj['data']['latestVersion'] assert dataset_version == f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}" assert platform_id == obj_latest['datasetPersistentId'] @@ -212,15 +217,16 @@ class DataverseHelper(FilesetPlatformHelper): extra['version'] = row['version'] if 'description' in df: extra['description'] = df['description'] - manifest.append(FilesetManifestFile( - path=df.get('originalFileName') or df['filename'], - size=df.get('originalFileSize') or df['filesize'], - md5=df['md5'], - # NOTE: don't get: sha1, sha256 - mimetype=df['contentType'], - platform_url=platform_url, - extra=extra or None, - )) + manifest.append( + FilesetManifestFile( + path=df.get('originalFileName') or df['filename'], + size=df.get('originalFileSize') or df['filesize'], + md5=df['md5'], + # NOTE: don't get: sha1, sha256 + mimetype=df['contentType'], + platform_url=platform_url, + extra=extra or None, + )) platform_sub_id = platform_id.split('/')[-1] archiveorg_item_name = f"{platform_domain}-{platform_sub_id}-v{dataset_version}" @@ -228,7 +234,8 @@ class DataverseHelper(FilesetPlatformHelper): # XXX: collection=platform_domain, collection="datasets", date=obj_latest['releaseTime'].split('T')[0], - source=f"https://{platform_domain}/dataset.xhtml?persistentId={platform_id}&version={dataset_version}", + source= + f"https://{platform_domain}/dataset.xhtml?persistentId={platform_id}&version={dataset_version}", ) if platform_id.startswith('doi:10.'): archiveorg_item_meta['doi'] = platform_id.replace('doi:', '') @@ -238,7 +245,8 @@ class DataverseHelper(FilesetPlatformHelper): elif block['typeName'] == 'depositor': archiveorg_item_meta['creator'] = block['value'] elif block['typeName'] == 'dsDescription': - archiveorg_item_meta['description'] = block['value'][0]['dsDescriptionValue']['value'] + archiveorg_item_meta['description'] = block['value'][0]['dsDescriptionValue'][ + 'value'] archiveorg_item_meta['description'] = archiveorg_item_meta.get('description', '') if obj_latest.get('termsOfUse'): @@ -252,11 +260,13 @@ class DataverseHelper(FilesetPlatformHelper): platform_id=platform_id, archiveorg_item_name=archiveorg_item_name, archiveorg_item_meta=archiveorg_item_meta, - web_bundle_url=f"https://{platform_domain}/api/access/dataset/:persistentId/?persistentId={platform_id}&format=original", + web_bundle_url= + f"https://{platform_domain}/api/access/dataset/:persistentId/?persistentId={platform_id}&format=original", # TODO: web_base_url= (for GWB downloading, in lieu of platform_url on individual files) extra=dict(version=dataset_version), ) + def test_parse_dataverse_persistentid(): valid = { @@ -322,8 +332,8 @@ def test_parse_dataverse_persistentid(): except ValueError: pass -class FigshareHelper(FilesetPlatformHelper): +class FigshareHelper(FilesetPlatformHelper): def __init__(self): super().__init__() self.platform_name = 'figshare' @@ -346,7 +356,9 @@ class FigshareHelper(FilesetPlatformHelper): raise ValueError(f"not a figshare URL: {path}") comp = comp[2:] - if comp[0] in ['dataset',]: + if comp[0] in [ + 'dataset', + ]: comp = comp[1:] if len(comp) == 3 and comp[1].isdigit() and comp[2].isdigit(): @@ -356,7 +368,8 @@ class FigshareHelper(FilesetPlatformHelper): else: raise ValueError(f"couldn't find figshare identiier: {path}") - def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool: + def match_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> bool: if resource and resource.terminal_url: url = resource.terminal_url @@ -381,7 +394,8 @@ class FigshareHelper(FilesetPlatformHelper): return False - def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: + def process_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) """ @@ -397,13 +411,15 @@ class FigshareHelper(FilesetPlatformHelper): (platform_id, dataset_version) = self.parse_figshare_url_path(components.path) assert platform_id.isdigit(), f"expected numeric: {platform_id}" - assert dataset_version and dataset_version.isdigit(), f"expected numeric: {dataset_version}" + assert dataset_version and dataset_version.isdigit( + ), f"expected numeric: {dataset_version}" # 1b. if we didn't get a version number from URL, fetch it from API # TODO: implement this code path # 2. API fetch - resp = self.session.get(f"https://api.figshare.com/v2/articles/{platform_id}/versions/{dataset_version}") + resp = self.session.get( + f"https://api.figshare.com/v2/articles/{platform_id}/versions/{dataset_version}") resp.raise_for_status() obj = resp.json() @@ -412,18 +428,21 @@ class FigshareHelper(FilesetPlatformHelper): if not obj['is_public']: raise PlatformRestrictedError(f'record not public: {platform_id} {dataset_version}') if obj['is_embargoed']: - raise PlatformRestrictedError(f'record is embargoed: {obj.get("embargo_title")} ({platform_id} {dataset_version})') + raise PlatformRestrictedError( + f'record is embargoed: {obj.get("embargo_title")} ({platform_id} {dataset_version})' + ) manifest = [] for row in obj['files']: - manifest.append(FilesetManifestFile( - path=row['name'], - size=row['size'], - md5=row['computed_md5'], - # NOTE: don't get: sha1, sha256, mimetype - platform_url=row['download_url'], - #extra=dict(), - )) + manifest.append( + FilesetManifestFile( + path=row['name'], + size=row['size'], + md5=row['computed_md5'], + # NOTE: don't get: sha1, sha256, mimetype + platform_url=row['download_url'], + #extra=dict(), + )) assert not row.get('is_link_only') authors = [] @@ -451,18 +470,23 @@ class FigshareHelper(FilesetPlatformHelper): platform_id=platform_id, archiveorg_item_name=archiveorg_item_name, archiveorg_item_meta=archiveorg_item_meta, - web_bundle_url=f"https://ndownloader.figshare.com/articles/{platform_id}/versions/{dataset_version}", + web_bundle_url= + f"https://ndownloader.figshare.com/articles/{platform_id}/versions/{dataset_version}", # TODO: web_base_url= (for GWB downloading, in lieu of platform_url on individual files) extra=dict(version=dataset_version), ) + def test_parse_figshare_url_path(): valid = { - "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858/1": ("8987858", "1"), - "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858": ("8987858", None), + "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858/1": + ("8987858", "1"), + "/articles/Optimized_protocol_to_isolate_high_quality_genomic_DNA_from_different_tissues_of_a_palm_species/8987858": + ("8987858", None), "/articles/CIBERSORT_p-value_0_05/8217188/1": ("8217188", "1"), - "/articles/dataset/STable_1_U-Pb_geochronologic_analyses_on_samples_xls/12127176/4": ("12127176", "4"), + "/articles/dataset/STable_1_U-Pb_geochronologic_analyses_on_samples_xls/12127176/4": + ("12127176", "4"), } invalid = [ @@ -479,14 +503,15 @@ def test_parse_figshare_url_path(): except ValueError: pass -class ZenodoHelper(FilesetPlatformHelper): +class ZenodoHelper(FilesetPlatformHelper): def __init__(self): super().__init__() self.platform_name = 'zenodo' self.session = requests.Session() - def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool: + def match_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> bool: if resource and resource.terminal_url: url = resource.terminal_url @@ -499,7 +524,8 @@ class ZenodoHelper(FilesetPlatformHelper): return True return False - def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: + def process_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) """ @@ -535,12 +561,15 @@ class ZenodoHelper(FilesetPlatformHelper): assert obj['id'] == int(platform_id) work_id = obj['conceptrecid'] if work_id == obj['id']: - raise PlatformScopeError("got a work-level zenodo record, not a versioned record: {work_id}") + raise PlatformScopeError( + "got a work-level zenodo record, not a versioned record: {work_id}") zenodo_type = obj['metadata']['resource_type']['type'] if obj['metadata']['access_right'] != 'open': - raise PlatformRestrictedError("not publicly available ({obj['metadata']['access_right']}): {platform_domain} {platform_id}") + raise PlatformRestrictedError( + "not publicly available ({obj['metadata']['access_right']}): {platform_domain} {platform_id}" + ) manifest = [] for row in obj['files']: @@ -600,11 +629,9 @@ class ArchiveOrgHelper(FilesetPlatformHelper): 'RAR': 'application/vnd.rar', 'TAR': 'application/x-tar', '7z': 'application/x-7z-compressed', - 'HTML': 'text/html', 'Text': 'text/plain', 'PDF': 'application/pdf', - 'CSV': 'text/csv', 'XML': 'application/xml', 'JSON': 'application/json', @@ -613,17 +640,13 @@ class ArchiveOrgHelper(FilesetPlatformHelper): #'application/vnd.openxmlformats-officedocument.wordprocessingml.document', # .docx #'application/vnd.ms-excel', # .xls #'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', # .xlsx - - 'MP3': 'audio/mpeg', # .mp3 - - 'MP4': 'video/mp4', # .mp4 - 'MPEG': 'video/mpeg', # .mpeg - + 'MP3': 'audio/mpeg', # .mp3 + 'MP4': 'video/mp4', # .mp4 + 'MPEG': 'video/mpeg', # .mpeg 'JPEG': 'image/jpeg', 'GIF': 'image/gif', 'PNG': 'image/png', 'TIFF': 'image/tiff', - 'Unknown': None, } @@ -640,24 +663,27 @@ class ArchiveOrgHelper(FilesetPlatformHelper): if f.source != 'original': return False for suffix in [ - '_meta.sqlite', - '_archive.torrent', - '_itemimage.jpg', - '_meta.xml', - '_thumb.png', - '_files.xml', + '_meta.sqlite', + '_archive.torrent', + '_itemimage.jpg', + '_meta.xml', + '_thumb.png', + '_files.xml', ]: if f.name == item_name + suffix or f.name == item_name.lower() + suffix: return False if f.name.startswith('_'): return False if item_name.startswith('academictorrents_'): - for suffix in ['_academictorrents.torrent', '_academictorrents_torrent.txt', '.bib']: + for suffix in [ + '_academictorrents.torrent', '_academictorrents_torrent.txt', '.bib' + ]: if f.name == item_name + suffix: return False return True - def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool: + def match_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> bool: if resource and resource.terminal_url: url = resource.terminal_url @@ -672,20 +698,23 @@ class ArchiveOrgHelper(FilesetPlatformHelper): return True return False - def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: + def process_request(self, request: dict, resource: Optional[ResourceResult], + html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) """ base_url_split = request['base_url'].split('/') #print(base_url_split, file=sys.stderr) - assert len(base_url_split) in [5,6] + assert len(base_url_split) in [5, 6] assert base_url_split[0] in ['http:', 'https:'] assert base_url_split[2] == 'archive.org' assert base_url_split[3] in ['details', 'download'] item_name = base_url_split[4] if len(base_url_split) == 6 and base_url_split[5]: - raise PlatformScopeError("got an archive.org file path, not download/details page; individual files not handled yet") + raise PlatformScopeError( + "got an archive.org file path, not download/details page; individual files not handled yet" + ) #print(f" archiveorg processing item={item_name}", file=sys.stderr) item = self.session.get_item(item_name) diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py index c9f182c..6c25276 100644 --- a/python/sandcrawler/fileset_strategies.py +++ b/python/sandcrawler/fileset_strategies.py @@ -1,4 +1,3 @@ - import gzip import json import os @@ -10,15 +9,15 @@ from typing import Any, Dict, List, Optional, Tuple import internetarchive -from sandcrawler.fileset_types import (ArchiveStrategyResult, FilesetManifestFile, FilesetPlatformItem, IngestStrategy, - PlatformScopeError) +from sandcrawler.fileset_types import (ArchiveStrategyResult, FilesetManifestFile, + FilesetPlatformItem, IngestStrategy, PlatformScopeError) from sandcrawler.html_metadata import BiblioMetadata -from sandcrawler.ia import ResourceResult, SavePageNowClient, WaybackClient, fix_transfer_encoding +from sandcrawler.ia import (ResourceResult, SavePageNowClient, WaybackClient, + fix_transfer_encoding) from sandcrawler.misc import gen_file_metadata, gen_file_metadata_path, sanitize_fs_path class FilesetIngestStrategy(): - def __init__(self): #self.ingest_strategy = 'unknown' self.success_status = "success" @@ -31,7 +30,6 @@ class FilesetIngestStrategy(): class ArchiveorgFilesetStrategy(FilesetIngestStrategy): - def __init__(self, **kwargs): super().__init__() self.ingest_strategy = IngestStrategy.ArchiveorgFileset @@ -61,7 +59,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): found = False for existing in item_files: if existing.name == wanted.path: - if ((existing.sha1 and existing.sha1 == wanted.sha1) or (existing.md5 and existing.md5 == wanted.md5)) and existing.name == wanted.path and existing.size == wanted.size: + if ((existing.sha1 and existing.sha1 == wanted.sha1) or + (existing.md5 and existing.md5 == wanted.md5) + ) and existing.name == wanted.path and existing.size == wanted.size: found = True wanted.status = 'exists' break @@ -69,7 +69,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): wanted.status = 'mismatch-existing' break if not found: - print(f" item exists ({item.archiveorg_item_name}) but didn't find at least one file: {wanted.path}", file=sys.stderr) + print( + f" item exists ({item.archiveorg_item_name}) but didn't find at least one file: {wanted.path}", + file=sys.stderr) return None return ArchiveStrategyResult( ingest_strategy=self.ingest_strategy, @@ -108,10 +110,11 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): if not os.path.exists(local_path): print(f" downloading {m.path}", file=sys.stderr) - with self.ia_session.get(m.platform_url, stream=True, allow_redirects=True) as r: + with self.ia_session.get(m.platform_url, stream=True, + allow_redirects=True) as r: r.raise_for_status() with open(local_path + '.partial', 'wb') as f: - for chunk in r.iter_content(chunk_size=256*1024): + for chunk in r.iter_content(chunk_size=256 * 1024): f.write(chunk) os.rename(local_path + '.partial', local_path) m.status = 'downloaded-local' @@ -120,7 +123,8 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): print(f" verifying {m.path}", file=sys.stderr) file_meta = gen_file_metadata_path(local_path, allow_empty=True) - assert file_meta['size_bytes'] == m.size, f"expected: {m.size} found: {file_meta['size_bytes']}" + assert file_meta[ + 'size_bytes'] == m.size, f"expected: {m.size} found: {file_meta['size_bytes']}" if m.sha1: assert file_meta['sha1hex'] == m.sha1 @@ -142,7 +146,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): if file_meta['mimetype'] != m.mimetype and file_meta['mimetype'] != 'text/plain': # these 'tab-separated-values' from dataverse are just noise, don't log them if m.mimetype != 'text/tab-separated-values': - print(f" WARN: mimetype mismatch: expected {m.mimetype}, found {file_meta['mimetype']}", file=sys.stderr) + print( + f" WARN: mimetype mismatch: expected {m.mimetype}, found {file_meta['mimetype']}", + file=sys.stderr) m.mimetype = file_meta['mimetype'] else: m.mimetype = file_meta['mimetype'] @@ -158,7 +164,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): 'remote_name': m.path, }) - print(f" uploading all files to {item.archiveorg_item_name} under {item.archiveorg_item_meta.get('collection')}...", file=sys.stderr) + print( + f" uploading all files to {item.archiveorg_item_name} under {item.archiveorg_item_meta.get('collection')}...", + file=sys.stderr) internetarchive.upload( item.archiveorg_item_name, files=item_files, @@ -183,25 +191,26 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy): return result + class ArchiveorgFileStrategy(ArchiveorgFilesetStrategy): """ ArchiveorgFilesetStrategy currently works fine with individual files. Just need to over-ride the ingest_strategy name. """ - def __init__(self): super().__init__() self.ingest_strategy = IngestStrategy.ArchiveorgFileset self.success_status = "success-file" -class WebFilesetStrategy(FilesetIngestStrategy): +class WebFilesetStrategy(FilesetIngestStrategy): def __init__(self, **kwargs): super().__init__() self.ingest_strategy = IngestStrategy.WebFileset self.wayback_client = WaybackClient() self.try_spn2 = True - self.spn_client = SavePageNowClient(spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0)) + self.spn_client = SavePageNowClient( + spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0)) self.max_spn_manifest = 20 def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult: @@ -218,23 +227,26 @@ class WebFilesetStrategy(FilesetIngestStrategy): for m in item.manifest: fetch_url = m.platform_url if not fetch_url: - raise NotImplementedError("require 'platform_url' for each file when doing Web fetching") + raise NotImplementedError( + "require 'platform_url' for each file when doing Web fetching") via = "wayback" resource = self.wayback_client.lookup_resource(fetch_url, m.mimetype) - if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture')): + if self.try_spn2 and (resource == None or + (resource and resource.status == 'no-capture')): if len(item.manifest) > self.max_spn_manifest: m.status = 'too-much-spn' continue via = "spn2" - resource = self.spn_client.crawl_resource(fetch_url, self.wayback_client, force_simple_get=True) + resource = self.spn_client.crawl_resource(fetch_url, + self.wayback_client, + force_simple_get=True) - print("[FETCH {:>6}] {} {}".format( - via, - (resource and resource.status), - (resource and resource.terminal_url) or fetch_url), - file=sys.stderr) + print("[FETCH {:>6}] {} {}".format(via, (resource and resource.status), + (resource and resource.terminal_url) + or fetch_url), + file=sys.stderr) m.terminal_url = resource.terminal_url m.terminal_dt = resource.terminal_dt @@ -251,9 +263,11 @@ class WebFilesetStrategy(FilesetIngestStrategy): file_meta, html_resource = fix_transfer_encoding(file_meta, resource) if self.ingest_strategy == "web-file": - file_file_meta = file_meta + file_file_meta = file_meta - if file_meta['size_bytes'] != m.size or (m.md5 and m.md5 != file_meta['md5hex']) or (m.sha1 and m.sha1 != file_meta['sha1hex']): + if file_meta['size_bytes'] != m.size or (m.md5 and m.md5 != file_meta['md5hex'] + ) or (m.sha1 + and m.sha1 != file_meta['sha1hex']): m.status = 'mismatch' continue @@ -280,8 +294,8 @@ class WebFilesetStrategy(FilesetIngestStrategy): result.file_resource = file_resource return result -class WebFileStrategy(WebFilesetStrategy): +class WebFileStrategy(WebFilesetStrategy): def __init__(self, **kwargs): super().__init__(**kwargs) self.ingest_strategy = IngestStrategy.WebFile diff --git a/python/sandcrawler/fileset_types.py b/python/sandcrawler/fileset_types.py index 8ea136e..606af07 100644 --- a/python/sandcrawler/fileset_types.py +++ b/python/sandcrawler/fileset_types.py @@ -1,4 +1,3 @@ - from enum import Enum from typing import Any, Dict, List, Optional, Tuple @@ -13,6 +12,7 @@ class IngestStrategy(str, Enum): ArchiveorgFileset = "archiveorg-fileset" ArchiveorgFilesetBundled = "archiveorg-fileset-bundled" + class FilesetManifestFile(BaseModel): path: str size: Optional[int] @@ -27,6 +27,7 @@ class FilesetManifestFile(BaseModel): terminal_url: Optional[str] terminal_dt: Optional[str] + class FilesetPlatformItem(BaseModel): platform_name: str platform_status: str @@ -39,6 +40,7 @@ class FilesetPlatformItem(BaseModel): web_base_url: Optional[str] web_bundle_url: Optional[str] + class ArchiveStrategyResult(BaseModel): ingest_strategy: str status: str @@ -49,6 +51,7 @@ class ArchiveStrategyResult(BaseModel): bundle_resource: Optional[Any] bundle_archiveorg_path: Optional[str] + class PlatformScopeError(Exception): """ For incidents where platform helper discovers that the fileset/dataset is @@ -61,6 +64,7 @@ class PlatformScopeError(Exception): """ pass + class PlatformRestrictedError(Exception): """ When datasets are not publicly available on a platform (yet) diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py index 5242b3a..16bbb01 100644 --- a/python/sandcrawler/grobid.py +++ b/python/sandcrawler/grobid.py @@ -1,4 +1,3 @@ - import requests from grobid2json import teixml2json @@ -8,7 +7,6 @@ from .workers import SandcrawlerFetchWorker, SandcrawlerWorker class GrobidClient(object): - def __init__(self, host_url="http://grobid.qa.fatcat.wiki", **kwargs): self.host_url = host_url self.consolidate_mode = int(kwargs.get('consolidate_mode', 0)) @@ -34,7 +32,7 @@ class GrobidClient(object): files={ 'input': blob, 'consolidateHeader': self.consolidate_mode, - 'consolidateCitations': 0, # too expensive for now + 'consolidateCitations': 0, # too expensive for now 'includeRawCitations': 1, }, timeout=180.0, @@ -46,9 +44,7 @@ class GrobidClient(object): 'error_msg': 'GROBID request (HTTP POST) timeout', } - info = dict( - status_code=grobid_response.status_code, - ) + info = dict(status_code=grobid_response.status_code, ) if grobid_response.status_code == 200: info['status'] = 'success' info['tei_xml'] = grobid_response.text @@ -56,7 +52,8 @@ class GrobidClient(object): # XML is larger than Kafka message size, and much larger than # an article in general; bail out info['status'] = 'error' - info['error_msg'] = "response XML too large: {} bytes".format(len(info['tei_xml'])) + info['error_msg'] = "response XML too large: {} bytes".format( + len(info['tei_xml'])) info.pop('tei_xml') else: # response.text is .content decoded as utf-8 @@ -70,7 +67,13 @@ class GrobidClient(object): tei_json = teixml2json(result['tei_xml'], encumbered=False) meta = dict() biblio = dict() - for k in ('title', 'authors', 'journal', 'date', 'doi', ): + for k in ( + 'title', + 'authors', + 'journal', + 'date', + 'doi', + ): if tei_json.get(k): biblio[k] = tei_json[k] meta['biblio'] = biblio @@ -79,8 +82,8 @@ class GrobidClient(object): meta[k] = tei_json[k] return meta -class GrobidWorker(SandcrawlerFetchWorker): +class GrobidWorker(SandcrawlerFetchWorker): def __init__(self, grobid_client, wayback_client=None, sink=None, **kwargs): super().__init__(wayback_client=wayback_client) self.grobid_client = grobid_client @@ -104,18 +107,19 @@ class GrobidWorker(SandcrawlerFetchWorker): return fetch_result blob = fetch_result['blob'] - result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode) + result = self.grobid_client.process_fulltext(blob, + consolidate_mode=self.consolidate_mode) result['file_meta'] = gen_file_metadata(blob) result['source'] = record result['key'] = result['file_meta']['sha1hex'] return result + class GrobidBlobWorker(SandcrawlerWorker): """ This is sort of like GrobidWorker, except it receives blobs directly, instead of fetching blobs from some remote store. """ - def __init__(self, grobid_client, sink=None, **kwargs): super().__init__() self.grobid_client = grobid_client @@ -125,8 +129,8 @@ class GrobidBlobWorker(SandcrawlerWorker): def process(self, blob, key=None): if not blob: return None - result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode) + result = self.grobid_client.process_fulltext(blob, + consolidate_mode=self.consolidate_mode) result['file_meta'] = gen_file_metadata(blob) result['key'] = result['file_meta']['sha1hex'] return result - diff --git a/python/sandcrawler/html.py b/python/sandcrawler/html.py index 6bdebdd..a44fc67 100644 --- a/python/sandcrawler/html.py +++ b/python/sandcrawler/html.py @@ -1,4 +1,3 @@ - import json import re import sys @@ -6,7 +5,8 @@ import urllib.parse from bs4 import BeautifulSoup -RESEARCHSQUARE_REGEX = re.compile(r'"url":"(https://assets.researchsquare.com/files/.{1,50}/v\d+/Manuscript.pdf)"') +RESEARCHSQUARE_REGEX = re.compile( + r'"url":"(https://assets.researchsquare.com/files/.{1,50}/v\d+/Manuscript.pdf)"') IEEEXPLORE_REGEX = re.compile(r'"pdfPath":"(/.*?\.pdf)"') OVID_JOURNAL_URL_REGEX = re.compile(r'journalURL = "(http.*)";') SCIENCEDIRECT_BOUNCE_URL_REGEX = re.compile(r"window.location = '(http.*)';") @@ -33,16 +33,16 @@ def extract_fulltext_url(html_url, html_body): ### General Tricks ### # highwire-style meta tag - meta = soup.find('meta', attrs={"name":"citation_pdf_url"}) + meta = soup.find('meta', attrs={"name": "citation_pdf_url"}) if not meta: - meta = soup.find('meta', attrs={"name":"bepress_citation_pdf_url"}) + meta = soup.find('meta', attrs={"name": "bepress_citation_pdf_url"}) if not meta: - meta = soup.find('meta', attrs={"name":"wkhealth_pdf_url"}) + meta = soup.find('meta', attrs={"name": "wkhealth_pdf_url"}) if not meta: # researchgate does this; maybe others also? - meta = soup.find('meta', attrs={"property":"citation_pdf_url"}) + meta = soup.find('meta', attrs={"property": "citation_pdf_url"}) if not meta: - meta = soup.find('meta', attrs={"name":"eprints.document_url"}) + meta = soup.find('meta', attrs={"name": "eprints.document_url"}) # if tag is only partially populated if meta and not meta.get('content'): meta = None @@ -52,10 +52,10 @@ def extract_fulltext_url(html_url, html_body): if '://doi.org/' in url: print(f"\tdoi.org in citation_pdf_url (loop?): {url}", file=sys.stderr) elif url.startswith('/'): - if host_prefix+url == html_url: + if host_prefix + url == html_url: print(f"\tavoiding citation_pdf_url link-loop", file=sys.stderr) else: - return dict(pdf_url=host_prefix+url, technique='citation_pdf_url') + return dict(pdf_url=host_prefix + url, technique='citation_pdf_url') elif url.startswith('http'): if url == html_url: print(f"\tavoiding citation_pdf_url link-loop", file=sys.stderr) @@ -64,7 +64,7 @@ def extract_fulltext_url(html_url, html_body): else: print("\tmalformed citation_pdf_url? {}".format(url), file=sys.stderr) - meta = soup.find('meta', attrs={"name":"generator"}) + meta = soup.find('meta', attrs={"name": "generator"}) meta_generator = None if meta and meta.get('content'): meta_generator = meta['content'].strip() @@ -105,7 +105,8 @@ def extract_fulltext_url(html_url, html_body): json_meta = json.loads(json_text) pdf_meta = json_meta['article']['pdfDownload']['urlMetadata'] # https://www.sciencedirect.com/science/article/pii/S0169204621000670/pdfft?md5=c4a83d06b334b627ded74cf9423bfa56&pid=1-s2.0-S0169204621000670-main.pdf - url = html_url + pdf_meta['pdfExtension'] + "?md5=" + pdf_meta['queryParams']['md5'] + "&pid=" + pdf_meta['queryParams']['pid'] + url = html_url + pdf_meta['pdfExtension'] + "?md5=" + pdf_meta['queryParams'][ + 'md5'] + "&pid=" + pdf_meta['queryParams']['pid'] except (KeyError, TypeError, json.JSONDecodeError): pass if url: @@ -130,7 +131,9 @@ def extract_fulltext_url(html_url, html_body): if m: url = m.group(1) assert len(url) < 4096 - return dict(release_stage="published", pdf_url=host_prefix+url, technique="ieeexplore") + return dict(release_stage="published", + pdf_url=host_prefix + url, + technique="ieeexplore") # https://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=8730313 if '://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber' in html_url: # HTML iframe like: @@ -172,11 +175,12 @@ def extract_fulltext_url(html_url, html_body): '://thesiscommons.org/', ] for domain in OSF_DOMAINS: - if domain in html_url and (len(html_url.split('/')) in [4,5] or '/preprints/' in html_url) and '/download' not in html_url: + if domain in html_url and (len(html_url.split('/')) in [4, 5] or '/preprints/' + in html_url) and '/download' not in html_url: if not html_url.endswith("/"): - next_url = html_url+"/download" + next_url = html_url + "/download" else: - next_url = html_url+"download" + next_url = html_url + "download" return dict(next_url=next_url, technique='osf-by-url') # wiley @@ -199,14 +203,14 @@ def extract_fulltext_url(html_url, html_body): url = html_url.replace("/doi/10.", "/doi/pdf/10.") return dict(pdf_url=url, technique='archivist-url') # - hrefs = soup.find_all('a', attrs={"target":"_blank"}) + hrefs = soup.find_all('a', attrs={"target": "_blank"}) for href in hrefs: url = href['href'].strip() if "/doi/pdf/" in url: if url.startswith('http'): return dict(pdf_url=url, technique='publisher-href') elif url.startswith('/'): - return dict(pdf_url=host_prefix+url, technique='publisher-href') + return dict(pdf_url=host_prefix + url, technique='publisher-href') # protocols.io # https://www.protocols.io/view/flow-cytometry-protocol-mgdc3s6 @@ -248,7 +252,8 @@ def extract_fulltext_url(html_url, html_body): if "://ehp.niehs.nih.gov/doi/" in html_url: # if b'/doi/pdf/10.' in html_body: - url = html_url.replace('/doi/full/10.', '/doi/pdf/10.').replace('/doi/10.', '/doi/pdf/10.') + url = html_url.replace('/doi/full/10.', + '/doi/pdf/10.').replace('/doi/10.', '/doi/pdf/10.') return dict(pdf_url=url, technique='ehp.niehs.nigh.gov-url') # cogentoa.com @@ -275,7 +280,7 @@ def extract_fulltext_url(html_url, html_body): # http://en.gzbd.cnki.net/gzbt/detail/detail.aspx?FileName=HBGF202002003&DbName=GZBJ7920&DbCode=GZBJ if '://en.gzbd.cnki.net/KCMS/detail/detail.aspx' in html_url: # PDF Download - href = soup.find('a', attrs={"id":"pdfDown"}) + href = soup.find('a', attrs={"id": "pdfDown"}) if href: url = href['href'].strip().replace(' ', '') if not url.startswith('http'): @@ -300,7 +305,7 @@ def extract_fulltext_url(html_url, html_body): # OJS 3 (some) if meta_generator and meta_generator.startswith("Open Journal Systems"): - href = soup.find('a', attrs={"class":"obj_galley_link file"}) + href = soup.find('a', attrs={"class": "obj_galley_link file"}) if href and href.text and "pdf" in href.text.lower(): url = href['href'].strip() if url.startswith('/'): @@ -329,13 +334,15 @@ def extract_fulltext_url(html_url, html_body): return dict() + def test_regex(): lines = """ blah var journalURL = "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689"; asdf""" m = OVID_JOURNAL_URL_REGEX.search(lines) - assert m.group(1) == "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689" + assert m.group( + 1) == "https://journals.lww.com/co-urology/fulltext/10.1097/MOU.0000000000000689" lines = """ window.onload = function () { diff --git a/python/sandcrawler/html_metadata.py b/python/sandcrawler/html_metadata.py index c6725dc..6d27a3a 100644 --- a/python/sandcrawler/html_metadata.py +++ b/python/sandcrawler/html_metadata.py @@ -1,4 +1,3 @@ - import datetime import sys import urllib.parse @@ -31,9 +30,7 @@ HEAD_META_PATTERNS: Any = { "meta[name='dcterms.title']", "meta[name='dc.title']", ], - "subtitle": [ - "meta[name='prism.subtitle']", - ], + "subtitle": ["meta[name='prism.subtitle']", ], "doi": [ "meta[name='citation_doi']", "meta[name='DOI']", @@ -43,9 +40,7 @@ HEAD_META_PATTERNS: Any = { "meta[name='dc.identifier.doi']", "meta[name='dc.identifier'][scheme='doi']", ], - "pmid": [ - "meta[name='citation_pmid']", - ], + "pmid": ["meta[name='citation_pmid']", ], "abstract": [ "meta[name='citation_abstract']", "meta[name='bepress_citation_abstract']", @@ -66,9 +61,7 @@ HEAD_META_PATTERNS: Any = { "meta[name='dc.source']", "meta[property='og:site_name']", ], - "container_abbrev": [ - "meta[name='citation_journal_abbrev']", - ], + "container_abbrev": ["meta[name='citation_journal_abbrev']", ], "raw_date": [ "meta[name='citation_publication_date']", "meta[name='bepress_citation_publication_date']", @@ -169,9 +162,7 @@ HEAD_META_LIST_PATTERNS: Any = { "meta[name='dc.contributor']", ], # TODO: citation_author_institution - "raw_references": [ - "meta[name='citation_reference']", - ], + "raw_references": ["meta[name='citation_reference']", ], "raw_identifiers": [ "meta[name='eprints.id_number']", "meta[name='dcterms.identifier']", @@ -260,7 +251,7 @@ HTML_FULLTEXT_PATTERNS: List[dict] = [ COMPONENT_FULLTEXT_PATTERNS: List[dict] = [ { - "in_doc_url": "pensoft.net/article/", # also /element/ + "in_doc_url": "pensoft.net/article/", # also /element/ "in_fulltext_url": "/download/fig/", "selector": ".Main-Content .figure a.P-Article-Preview-Picture-Download-Small", "attr": "href", @@ -652,12 +643,11 @@ class BiblioMetadata(pydantic.BaseModel): component_url: Optional[str] class Config: - json_encoders = { - datetime.date: lambda dt: dt.isoformat() - } + json_encoders = {datetime.date: lambda dt: dt.isoformat()} -def html_extract_fulltext_url(doc_url: str, doc: HTMLParser, patterns: List[dict]) -> Optional[Tuple[str, str]]: +def html_extract_fulltext_url(doc_url: str, doc: HTMLParser, + patterns: List[dict]) -> Optional[Tuple[str, str]]: """ Tries to quickly extract fulltext URLs using a set of patterns. This function is intendend to be generic across various extraction techniques. @@ -701,6 +691,7 @@ def html_extract_fulltext_url(doc_url: str, doc: HTMLParser, patterns: List[dict return self_doc_url return None + def html_extract_biblio(doc_url: str, doc: HTMLParser) -> Optional[BiblioMetadata]: meta: Any = dict() @@ -772,6 +763,7 @@ def html_extract_biblio(doc_url: str, doc: HTMLParser) -> Optional[BiblioMetadat return BiblioMetadata(**meta) + def load_adblock_rules() -> braveblock.Adblocker: """ TODO: consider blocking very generic assets: @@ -838,7 +830,8 @@ def _extract_generic(doc: HTMLParser, selector: str, attrs: List[str], type_name return resources -def html_extract_resources(doc_url: str, doc: HTMLParser, adblock: braveblock.Adblocker) -> list: +def html_extract_resources(doc_url: str, doc: HTMLParser, + adblock: braveblock.Adblocker) -> list: """ This function tries to find all the important resources in a page. The presumption is that the HTML document is article fulltext, and we want the @@ -869,10 +862,12 @@ def html_extract_resources(doc_url: str, doc: HTMLParser, adblock: braveblock.Ad r['url'] = urllib.parse.urljoin(doc_url, r['url']) # filter using adblocker - resources = [r for r in resources if adblock.check_network_urls(r['url'], source_url=doc_url, request_type=r['type']) == False] + resources = [ + r for r in resources if adblock.check_network_urls( + r['url'], source_url=doc_url, request_type=r['type']) == False + ] # remove duplicates resources = [dict(t) for t in {tuple(d.items()) for d in resources}] return resources - diff --git a/python/sandcrawler/ia.py b/python/sandcrawler/ia.py index ca1182f..a8ce193 100644 --- a/python/sandcrawler/ia.py +++ b/python/sandcrawler/ia.py @@ -1,4 +1,3 @@ - # XXX: some broken MRO thing going on in here due to python3 object wrangling # in `wayback` library. Means we can't run pylint. # pylint: skip-file @@ -38,6 +37,7 @@ class SandcrawlerBackoffError(Exception): """ pass + ResourceResult = namedtuple("ResourceResult", [ "start_url", "hit", @@ -80,6 +80,7 @@ CdxPartial = namedtuple('CdxPartial', [ 'sha1hex', ]) + def cdx_partial_from_row(full): return CdxPartial( surt=full.surt, @@ -91,6 +92,7 @@ def cdx_partial_from_row(full): sha1hex=full.sha1hex, ) + def cdx_to_dict(cdx): d = { "surt": cdx.surt, @@ -107,6 +109,7 @@ def cdx_to_dict(cdx): d['warc_path'] = cdx.warc_path return d + def fuzzy_match_url(left, right): """ Matches URLs agnostic of http/https (and maybe other normalizations in the @@ -123,6 +126,7 @@ def fuzzy_match_url(left, right): return True return False + def test_fuzzy_match_url(): assert fuzzy_match_url("http://thing.com", "http://thing.com") == True assert fuzzy_match_url("http://thing.com", "https://thing.com") == True @@ -137,18 +141,19 @@ def test_fuzzy_match_url(): assert fuzzy_match_url("http://www.thing.com", "http://www2.thing.com") == False assert fuzzy_match_url("http://www.thing.com", "https://www2.thing.com") == False + class CdxApiError(Exception): pass -class CdxApiClient: +class CdxApiClient: def __init__(self, host_url="https://web.archive.org/cdx/search/cdx", **kwargs): self.host_url = host_url self.http_session = requests_retry_session(retries=3, backoff_factor=3) - cdx_auth_token = kwargs.get('cdx_auth_token', - os.environ.get('CDX_AUTH_TOKEN')) + cdx_auth_token = kwargs.get('cdx_auth_token', os.environ.get('CDX_AUTH_TOKEN')) if not cdx_auth_token: - raise Exception("CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)") + raise Exception( + "CDX auth token required (as parameter or environment variable CDX_AUTH_TOKEN)") self.http_session.headers.update({ 'User-Agent': 'Mozilla/5.0 sandcrawler.CdxApiClient', 'Cookie': 'cdx_auth_token={}'.format(cdx_auth_token), @@ -208,7 +213,8 @@ class CdxApiClient: found, because we expect to be looking up a specific full record. """ if len(datetime) != 14: - raise ValueError("CDX fetch requires full 14 digit timestamp. Got: {}".format(datetime)) + raise ValueError( + "CDX fetch requires full 14 digit timestamp. Got: {}".format(datetime)) params = { 'url': url, 'from': datetime, @@ -226,18 +232,28 @@ class CdxApiClient: if retry_sleep > 3: next_sleep = retry_sleep - 3 retry_sleep = 3 - print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), file=sys.stderr) + print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), + file=sys.stderr) time.sleep(retry_sleep) - return self.fetch(url, datetime, filter_status_code=filter_status_code, retry_sleep=next_sleep) + return self.fetch(url, + datetime, + filter_status_code=filter_status_code, + retry_sleep=next_sleep) raise KeyError("CDX url/datetime not found: {} {}".format(url, datetime)) row = resp[0] # allow fuzzy http/https match if not (fuzzy_match_url(row.url, url) and row.datetime == datetime): if retry_sleep and retry_sleep > 0: - print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), file=sys.stderr) + print(" CDX fetch failed; will sleep {}sec and try again".format(retry_sleep), + file=sys.stderr) time.sleep(retry_sleep) - return self.fetch(url, datetime, filter_status_code=filter_status_code, retry_sleep=None) - raise KeyError("Didn't get exact CDX url/datetime match. url:{} dt:{} got:{}".format(url, datetime, row)) + return self.fetch(url, + datetime, + filter_status_code=filter_status_code, + retry_sleep=None) + raise KeyError( + "Didn't get exact CDX url/datetime match. url:{} dt:{} got:{}".format( + url, datetime, row)) if filter_status_code: assert row.status_code == filter_status_code return row @@ -311,17 +327,20 @@ class CdxApiClient: class WaybackError(Exception): pass + class WaybackContentError(Exception): pass + class PetaboxError(Exception): pass + class NoCaptureError(Exception): pass -class WaybackClient: +class WaybackClient: def __init__(self, cdx_client=None, **kwargs): if cdx_client: self.cdx_client = cdx_client @@ -367,32 +386,42 @@ class WaybackClient: if not self.petabox_webdata_secret: raise Exception("WaybackClient needs petabox secret to do direct WARC fetches") if not "/" in warc_path: - raise ValueError("what looks like a liveweb/SPN temporary warc path: {}".format(warc_path)) + raise ValueError( + "what looks like a liveweb/SPN temporary warc path: {}".format(warc_path)) warc_uri = self.warc_uri_prefix + warc_path if not self.rstore: - self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory3( - webdata_secret=self.petabox_webdata_secret, - )) + self.rstore = ResourceStore( + loaderfactory=CDXLoaderFactory3(webdata_secret=self.petabox_webdata_secret, )) try: #print("offset: {} csize: {} uri: {}".format(offset, csize, warc_uri), file=sys.stderr) gwb_record = self.rstore.load_resource(warc_uri, offset, csize) except wayback.exception.ResourceUnavailable: print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr) - raise PetaboxError("failed to load file contents from wayback/petabox (ResourceUnavailable)") + raise PetaboxError( + "failed to load file contents from wayback/petabox (ResourceUnavailable)") except wayback.exception.InvalidResource: print(" Failed to fetch from warc_path:{}".format(warc_path), file=sys.stderr) - raise WaybackContentError("failed to load file contents from wayback/petabox (InvalidResource)") + raise WaybackContentError( + "failed to load file contents from wayback/petabox (InvalidResource)") except urllib3.exceptions.ReadTimeoutError as rte: - raise PetaboxError("failed to load file contents from wayback/petabox (ReadTimeoutError: {})".format(rte)) + raise PetaboxError( + "failed to load file contents from wayback/petabox (ReadTimeoutError: {})". + format(rte)) except ValueError as ve: - raise PetaboxError("failed to load file contents from wayback/petabox (ValueError: {})".format(ve)) + raise PetaboxError( + "failed to load file contents from wayback/petabox (ValueError: {})".format(ve)) except EOFError as eofe: - raise PetaboxError("failed to load file contents from wayback/petabox (EOFError: {})".format(eofe)) + raise PetaboxError( + "failed to load file contents from wayback/petabox (EOFError: {})".format(eofe)) except TypeError as te: - raise PetaboxError("failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te)) + raise PetaboxError( + "failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)" + .format(te)) except Exception as e: if "while decompressing data: invalid block type" in str(e): - raise PetaboxError("decompression error fetching WARC record; usually due to bad alexa ARC files") + raise PetaboxError( + "decompression error fetching WARC record; usually due to bad alexa ARC files" + ) else: raise e # Note: could consider a generic "except Exception" here, as we get so @@ -405,7 +434,8 @@ class WaybackClient: raise WaybackContentError("too many HTTP headers (in wayback fetch)") location = gwb_record.get_location() or None - if status_code is None and gwb_record.target_uri.startswith(b"ftp://") and not gwb_record.is_revisit(): + if status_code is None and gwb_record.target_uri.startswith( + b"ftp://") and not gwb_record.is_revisit(): # TODO: some additional verification here? status_code = 226 @@ -416,8 +446,9 @@ class WaybackClient: raise WaybackContentError("found revisit record, but won't resolve (loop?)") revisit_uri, revisit_dt = gwb_record.refers_to if not (revisit_uri and revisit_dt): - raise WaybackContentError("revisit record missing URI and/or DT: warc:{} offset:{}".format( - warc_path, offset)) + raise WaybackContentError( + "revisit record missing URI and/or DT: warc:{} offset:{}".format( + warc_path, offset)) # convert revisit_dt # len("2018-07-24T11:56:49"), or with "Z" assert len(revisit_dt) in (19, 20) @@ -425,7 +456,9 @@ class WaybackClient: revisit_uri = revisit_uri.decode('utf-8') if type(revisit_dt) is bytes: revisit_dt = revisit_dt.decode('utf-8') - revisit_dt = revisit_dt.replace('-', '').replace(':', '').replace('T', '').replace('Z', '') + revisit_dt = revisit_dt.replace('-', '').replace(':', + '').replace('T', + '').replace('Z', '') assert len(revisit_dt) == 14 try: revisit_cdx = self.cdx_client.fetch(revisit_uri, revisit_dt) @@ -443,10 +476,10 @@ class WaybackClient: body = gwb_record.open_raw_content().read() except IncompleteRead as ire: raise WaybackError( - "failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire)) + "failed to read actual file contents from wayback/petabox (IncompleteRead: {})" + .format(ire)) elif status_code is None: - raise WaybackContentError( - "got a None status_code in (W)ARC record") + raise WaybackContentError("got a None status_code in (W)ARC record") return WarcResource( status_code=status_code, location=location, @@ -454,7 +487,12 @@ class WaybackClient: revisit_cdx=revisit_cdx, ) - def fetch_petabox_body(self, csize, offset, warc_path, resolve_revisit=True, expected_status_code=None): + def fetch_petabox_body(self, + csize, + offset, + warc_path, + resolve_revisit=True, + expected_status_code=None): """ Fetches HTTP 200 WARC resource directly from petabox using WARC path/offset/csize. @@ -474,12 +512,10 @@ class WaybackClient: raise KeyError("archived HTTP response (WARC) was not {}: {}".format( expected_status_code, resource.status_code, - ) - ) + )) elif resource.status_code not in (200, 226): raise KeyError("archived HTTP response (WARC) was not 200: {}".format( - resource.status_code) - ) + resource.status_code)) return resource.body @@ -514,7 +550,9 @@ class WaybackClient: except requests.exceptions.ChunkedEncodingError: raise WaybackError("ChunkedEncodingError (wayback replay fetch)") except UnicodeDecodeError: - raise WaybackContentError("UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(url)) + raise WaybackContentError( + "UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format( + url)) try: resp.raise_for_status() @@ -526,21 +564,20 @@ class WaybackClient: if not "X-Archive-Src" in resp.headers: raise WaybackError("replay fetch didn't return X-Archive-Src in headers") if not datetime in resp.url: - raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(datetime, resp.url)) + raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format( + datetime, resp.url)) if cdx_sha1hex: # verify that body matches CDX hash # TODO: don't need *all* these hashes, just sha1 file_meta = gen_file_metadata(resp.content) if cdx_sha1hex != file_meta['sha1hex']: - print(" REPLAY MISMATCH: cdx:{} replay:{}".format( - cdx_sha1hex, - file_meta['sha1hex']), - file=sys.stderr) - raise WaybackContentError("replay fetch body didn't match CDX hash cdx:{} body:{}".format( - cdx_sha1hex, - file_meta['sha1hex']), - ) + print(" REPLAY MISMATCH: cdx:{} replay:{}".format(cdx_sha1hex, + file_meta['sha1hex']), + file=sys.stderr) + raise WaybackContentError( + "replay fetch body didn't match CDX hash cdx:{} body:{}".format( + cdx_sha1hex, file_meta['sha1hex']), ) return resp.content def fetch_replay_redirect(self, url, datetime): @@ -568,7 +605,9 @@ class WaybackClient: except requests.exceptions.TooManyRedirects: raise WaybackContentError("redirect loop (wayback replay fetch)") except UnicodeDecodeError: - raise WaybackContentError("UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format(url)) + raise WaybackContentError( + "UnicodeDecodeError in replay request (can mean nasty redirect URL): {}".format( + url)) try: resp.raise_for_status() except Exception as e: @@ -580,7 +619,8 @@ class WaybackClient: if not "X-Archive-Src" in resp.headers: raise WaybackError("redirect replay fetch didn't return X-Archive-Src in headers") if not datetime in resp.url: - raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format(datetime, resp.url)) + raise WaybackError("didn't get exact reply (redirect?) datetime:{} got:{}".format( + datetime, resp.url)) redirect_url = resp.headers.get("Location") # eg, https://web.archive.org/web/20200111003923id_/https://dx.doi.org/10.17504/protocols.io.y2gfybw @@ -622,7 +662,9 @@ class WaybackClient: urls_seen = [start_url] for i in range(self.max_redirects): print(" URL: {}".format(next_url), file=sys.stderr) - cdx_row = self.cdx_client.lookup_best(next_url, best_mimetype=best_mimetype, closest=closest) + cdx_row = self.cdx_client.lookup_best(next_url, + best_mimetype=best_mimetype, + closest=closest) #print(cdx_row, file=sys.stderr) if not cdx_row: return ResourceResult( @@ -776,9 +818,11 @@ class WaybackClient: class SavePageNowError(Exception): pass + class SavePageNowBackoffError(SandcrawlerBackoffError): pass + SavePageNowResult = namedtuple('SavePageNowResult', [ 'success', 'status', @@ -789,13 +833,11 @@ SavePageNowResult = namedtuple('SavePageNowResult', [ 'resources', ]) -class SavePageNowClient: +class SavePageNowClient: def __init__(self, v2endpoint="https://web.archive.org/save", **kwargs): - self.ia_access_key = kwargs.get('ia_access_key', - os.environ.get('IA_ACCESS_KEY')) - self.ia_secret_key = kwargs.get('ia_secret_key', - os.environ.get('IA_SECRET_KEY')) + self.ia_access_key = kwargs.get('ia_access_key', os.environ.get('IA_ACCESS_KEY')) + self.ia_secret_key = kwargs.get('ia_secret_key', os.environ.get('IA_SECRET_KEY')) self.v2endpoint = v2endpoint self.v2_session = requests_retry_session(retries=5, backoff_factor=3) self.v2_session.headers.update({ @@ -886,12 +928,15 @@ class SavePageNowClient: }, ) if resp.status_code == 429: - raise SavePageNowBackoffError("status_code: {}, url: {}".format(resp.status_code, request_url)) + raise SavePageNowBackoffError("status_code: {}, url: {}".format( + resp.status_code, request_url)) elif resp.status_code != 200: - raise SavePageNowError("SPN2 status_code: {}, url: {}".format(resp.status_code, request_url)) + raise SavePageNowError("SPN2 status_code: {}, url: {}".format( + resp.status_code, request_url)) resp_json = resp.json() - if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json['message']: + if resp_json and 'message' in resp_json and 'You have already reached the limit of active sessions' in resp_json[ + 'message']: raise SavePageNowBackoffError(resp_json['message']) elif not resp_json or 'job_id' not in resp_json or not resp_json['job_id']: raise SavePageNowError( @@ -915,7 +960,8 @@ class SavePageNowClient: final_json = resp.json() break else: - raise SavePageNowError("Unknown SPN2 status:{} url:{}".format(status, request_url)) + raise SavePageNowError("Unknown SPN2 status:{} url:{}".format( + status, request_url)) if not final_json: raise SavePageNowError("SPN2 timed out (polling count exceeded)") @@ -923,8 +969,10 @@ class SavePageNowClient: # if there was a recent crawl of same URL, fetch the status of that # crawl to get correct datetime if final_json.get('original_job_id'): - print(f" SPN recent capture: {job_id} -> {final_json['original_job_id']}", file=sys.stderr) - resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint, final_json['original_job_id'])) + print(f" SPN recent capture: {job_id} -> {final_json['original_job_id']}", + file=sys.stderr) + resp = self.v2_session.get("{}/status/{}".format(self.v2endpoint, + final_json['original_job_id'])) try: resp.raise_for_status() except: @@ -935,7 +983,8 @@ class SavePageNowClient: if final_json['status'] == "success": if final_json.get('original_url').startswith('/'): - print(f" truncateded URL in JSON: {request_url} {json.dumps(final_json)}", file=sys.stderr) + print(f" truncateded URL in JSON: {request_url} {json.dumps(final_json)}", + file=sys.stderr) return SavePageNowResult( True, "success", @@ -969,15 +1018,17 @@ class SavePageNowClient: # HACK: capture CNKI domains with outlinks (for COVID-19 crawling) if 'gzbd.cnki.net/' in start_url: - spn_result = self.save_url_now_v2(start_url, force_simple_get=force_simple_get, capture_outlinks=1) + spn_result = self.save_url_now_v2(start_url, + force_simple_get=force_simple_get, + capture_outlinks=1) else: spn_result = self.save_url_now_v2(start_url, force_simple_get=force_simple_get) if not spn_result.success: status = spn_result.status if status in ("error:invalid-url", "error:not-found", - "error:invalid-host-resolution", "error:gateway-timeout", - "error:too-many-redirects", "error:read-timeout"): + "error:invalid-host-resolution", "error:gateway-timeout", + "error:too-many-redirects", "error:read-timeout"): status = status.replace("error:", "") elif status in ("error:no-access", "error:forbidden"): status = "forbidden" @@ -988,7 +1039,8 @@ class SavePageNowClient: elif status.startswith("error:"): status = "spn2-" + status # despite other errors, call these a failure (so we don't retry) - if spn_result.terminal_url and (spn_result.terminal_url.endswith('/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1")): + if spn_result.terminal_url and (spn_result.terminal_url.endswith('/cookieAbsent') + or spn_result.terminal_url.endswith("cookieSet=1")): status = "blocked-cookie" return ResourceResult( start_url=start_url, @@ -1018,7 +1070,8 @@ class SavePageNowClient: ) # don't try to CDX fetch for this common cookie block terminal - if spn_result.terminal_url.endswith('/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1"): + if spn_result.terminal_url.endswith( + '/cookieAbsent') or spn_result.terminal_url.endswith("cookieSet=1"): return ResourceResult( start_url=start_url, hit=False, @@ -1143,9 +1196,12 @@ class SavePageNowClient: ) -def fix_transfer_encoding(file_meta: dict, resource: ResourceResult) -> Tuple[dict, ResourceResult]: - if resource.body and file_meta['mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip': - print(" transfer encoding not stripped: {}".format(resource.cdx.mimetype), file=sys.stderr) +def fix_transfer_encoding(file_meta: dict, + resource: ResourceResult) -> Tuple[dict, ResourceResult]: + if resource.body and file_meta[ + 'mimetype'] == 'application/gzip' and resource.cdx and resource.cdx.mimetype != 'application/gzip': + print(" transfer encoding not stripped: {}".format(resource.cdx.mimetype), + file=sys.stderr) inner_body = gzip.decompress(resource.body) if not inner_body: raise Exception("null body inside transfer encoding") diff --git a/python/sandcrawler/ingest_file.py b/python/sandcrawler/ingest_file.py index 137a793..b480cc2 100644 --- a/python/sandcrawler/ingest_file.py +++ b/python/sandcrawler/ingest_file.py @@ -1,4 +1,3 @@ - import base64 import gzip import json @@ -15,18 +14,22 @@ from selectolax.parser import HTMLParser from sandcrawler.db import SandcrawlerPostgrestClient from sandcrawler.grobid import GrobidClient from sandcrawler.html import extract_fulltext_url -from sandcrawler.html_metadata import BiblioMetadata, html_extract_biblio, html_extract_resources, load_adblock_rules -from sandcrawler.ia import (CdxApiClient, CdxApiError, NoCaptureError, PetaboxError, ResourceResult, SavePageNowClient, - SavePageNowError, WaybackClient, WaybackContentError, WaybackError, cdx_to_dict, +from sandcrawler.html_metadata import (BiblioMetadata, html_extract_biblio, + html_extract_resources, load_adblock_rules) +from sandcrawler.ia import (CdxApiClient, CdxApiError, NoCaptureError, PetaboxError, + ResourceResult, SavePageNowClient, SavePageNowError, WaybackClient, + WaybackContentError, WaybackError, cdx_to_dict, fix_transfer_encoding) -from sandcrawler.ingest_html import (WebResource, fetch_html_resources, html_extract_body_teixml, html_guess_platform, +from sandcrawler.ingest_html import (WebResource, fetch_html_resources, + html_extract_body_teixml, html_guess_platform, html_guess_scope, quick_fetch_html_resources) from sandcrawler.misc import clean_url, gen_file_metadata, parse_cdx_datetime from sandcrawler.pdfextract import PdfExtractResult, process_pdf from sandcrawler.workers import SandcrawlerWorker from sandcrawler.xml import xml_reserialize -MAX_BODY_SIZE_BYTES = 128*1024*1024 +MAX_BODY_SIZE_BYTES = 128 * 1024 * 1024 + class IngestFileWorker(SandcrawlerWorker): """ @@ -54,7 +57,6 @@ class IngestFileWorker(SandcrawlerWorker): process_file_hit(ResourceResult) -> response process_grobid(ResourceResult) """ - def __init__(self, sink=None, **kwargs): super().__init__() @@ -64,7 +66,8 @@ class IngestFileWorker(SandcrawlerWorker): self.wayback_client = WaybackClient() self.spn_client = kwargs.get('spn_client') if not self.spn_client: - self.spn_client = SavePageNowClient(spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0)) + self.spn_client = SavePageNowClient( + spn_cdx_retry_sec=kwargs.get('spn_cdx_retry_sec', 9.0)) self.grobid_client = kwargs.get('grobid_client') if not self.grobid_client: self.grobid_client = GrobidClient() @@ -123,13 +126,13 @@ class IngestFileWorker(SandcrawlerWorker): "fao.org/glis/", # Historical non-paper content: - "dhz.uni-passau.de/", # newspapers - "digital.ucd.ie/", # ireland national historical + "dhz.uni-passau.de/", # newspapers + "digital.ucd.ie/", # ireland national historical # DOI prefixes - "doi.org/10.2307/", # JSTOR; slow and many redirects - "doi.org/10.18730/", # fao.org: database entry - "doi.org/10.15468/", # gbif.org: database entry + "doi.org/10.2307/", # JSTOR; slow and many redirects + "doi.org/10.18730/", # fao.org: database entry + "doi.org/10.15468/", # gbif.org: database entry # deprecated domain (doesn't redirect correctly) "://edoc.mpg.de/", @@ -173,10 +176,10 @@ class IngestFileWorker(SandcrawlerWorker): "video/mpeg", "text/plain", "text/csv", - "text/x-r-source", # dataverse - "text/tab-separated-values", # dataverse - "text/x-rst", # dataverse - "application/x-rlang-transport", # dataverse + "text/x-r-source", # dataverse + "text/tab-separated-values", # dataverse + "text/x-rst", # dataverse + "application/x-rlang-transport", # dataverse "application/json", "application/xml", "application/pdf", @@ -194,7 +197,6 @@ class IngestFileWorker(SandcrawlerWorker): "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ] - def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: """ Check in sandcrawler-db (postgres) to see if we have already ingested @@ -214,7 +216,10 @@ class IngestFileWorker(SandcrawlerWorker): else: return None - def find_resource(self, url, best_mimetype=None, force_recrawl=False) -> Optional[ResourceResult]: + def find_resource(self, + url, + best_mimetype=None, + force_recrawl=False) -> Optional[ResourceResult]: """ Looks in wayback for a resource starting at the URL, following any redirects. If a hit isn't found, try crawling with SPN. @@ -222,7 +227,8 @@ class IngestFileWorker(SandcrawlerWorker): via = "none" resource = None - if url.startswith("http://web.archive.org/web/") or url.startswith("https://web.archive.org/web/"): + if url.startswith("http://web.archive.org/web/") or url.startswith( + "https://web.archive.org/web/"): raise NotImplementedError("handling direct wayback links not supported yet") if url.startswith("http://archive.org/") or url.startswith("https://archive.org/"): @@ -243,14 +249,13 @@ class IngestFileWorker(SandcrawlerWorker): if resource and not resource.hit and resource.terminal_dt and resource.terminal_dt < '20190000000000': old_failure = True - if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture') or soft404 or old_failure): + if self.try_spn2 and (resource == None or (resource and resource.status == 'no-capture') + or soft404 or old_failure): via = "spn2" resource = self.spn_client.crawl_resource(url, self.wayback_client) - print("[FETCH {:>6}] {} {}".format( - via, - (resource and resource.status), - (resource and resource.terminal_url) or url), - file=sys.stderr) + print("[FETCH {:>6}] {} {}".format(via, (resource and resource.status), + (resource and resource.terminal_url) or url), + file=sys.stderr) return resource def process_existing(self, request: dict, result_row: dict) -> dict: @@ -262,7 +267,8 @@ class IngestFileWorker(SandcrawlerWorker): assert result_row['hit'] existing_file_meta = self.pgrest_client.get_file_meta(result_row['terminal_sha1hex']) existing_grobid = self.pgrest_client.get_grobid(result_row['terminal_sha1hex']) - existing_cdx = self.pgrest_client.get_cdx(result_row['terminal_url'], result_row['terminal_dt']) + existing_cdx = self.pgrest_client.get_cdx(result_row['terminal_url'], + result_row['terminal_dt']) if not (existing_file_meta and existing_grobid and existing_cdx): raise NotImplementedError("partially-exsiting records not implemented yet") result = { @@ -281,11 +287,13 @@ class IngestFileWorker(SandcrawlerWorker): } return result - def process_file_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict: + def process_file_hit(self, ingest_type: str, resource: ResourceResult, + file_meta: dict) -> dict: """ Run all the necessary processing for a new/fresh ingest hit. """ - if ingest_type in ["dataset-file", "component"] and file_meta['mimetype'] == "application/pdf": + if ingest_type in ["dataset-file", "component" + ] and file_meta['mimetype'] == "application/pdf": ingest_type = "pdf" if ingest_type == "pdf": return { @@ -396,24 +404,26 @@ class IngestFileWorker(SandcrawlerWorker): try: html_doc = HTMLParser(resource.body) except ValueError as ve: - return dict( - status="html-selectolax-error", - ) + return dict(status="html-selectolax-error", ) html_biblio = html_extract_biblio(resource.terminal_url, html_doc) assert html_biblio html_body = html_extract_body_teixml(resource.body) html_platform = html_guess_platform(resource.terminal_url, html_doc, html_biblio) - html_scope = html_guess_scope(resource.terminal_url, html_doc, html_biblio, html_body.get('word_count')) + html_scope = html_guess_scope(resource.terminal_url, html_doc, html_biblio, + html_body.get('word_count')) html_biblio_dict = json.loads(html_biblio.json(exclude_none=True)) - if html_scope in ('blocked-captcha','blocked-cookie','blocked-forbidden'): + if html_scope in ('blocked-captcha', 'blocked-cookie', 'blocked-forbidden'): return dict( status=html_scope, html_biblio=html_biblio_dict, scope=html_scope, platform=html_platform, ) - elif html_scope not in ('article-fulltext','unknown',): + elif html_scope not in ( + 'article-fulltext', + 'unknown', + ): html_body.pop("tei_xml", None) return dict( status="wrong-scope", @@ -423,7 +433,8 @@ class IngestFileWorker(SandcrawlerWorker): html_body=html_body, ) - raw_resources = html_extract_resources(resource.terminal_url, html_doc, self.adblock_rules) + raw_resources = html_extract_resources(resource.terminal_url, html_doc, + self.adblock_rules) if len(raw_resources) > self.max_html_resources: html_body.pop("tei_xml", None) return dict( @@ -452,7 +463,9 @@ class IngestFileWorker(SandcrawlerWorker): try: if self.html_quick_mode: print(" WARN: running quick CDX-only fetches", file=sys.stderr) - full_resources = quick_fetch_html_resources(raw_resources, self.wayback_client.cdx_client, when) + full_resources = quick_fetch_html_resources(raw_resources, + self.wayback_client.cdx_client, + when) else: full_resources = fetch_html_resources(raw_resources, self.wayback_client, when) except PetaboxError as e: @@ -572,7 +585,9 @@ class IngestFileWorker(SandcrawlerWorker): return result try: - resource = self.find_resource(next_url, best_mimetype, force_recrawl=force_recrawl) + resource = self.find_resource(next_url, + best_mimetype, + force_recrawl=force_recrawl) except SavePageNowError as e: result['status'] = 'spn2-error' result['error_message'] = str(e)[:1600] @@ -650,10 +665,9 @@ class IngestFileWorker(SandcrawlerWorker): # here we split based on ingest type to try and extract a next hop html_ish_resource = bool( "html" in file_meta['mimetype'] - or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml" + or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml" or "application/xml" in file_meta['mimetype'] - or "text/xml" in file_meta['mimetype'] - ) + or "text/xml" in file_meta['mimetype']) html_biblio = None html_doc = None if html_ish_resource and resource.body: @@ -662,7 +676,8 @@ class IngestFileWorker(SandcrawlerWorker): html_biblio = html_extract_biblio(resource.terminal_url, html_doc) if html_biblio: if not 'html_biblio' in result or html_biblio.title: - result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True)) + result['html_biblio'] = json.loads( + html_biblio.json(exclude_none=True)) #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr) except ValueError: pass @@ -686,18 +701,19 @@ class IngestFileWorker(SandcrawlerWorker): assert next_url next_url = clean_url(next_url) print("[PARSE {:>6}] {} {}".format( - ingest_type, - fulltext_url.get('technique'), - next_url, - ), - file=sys.stderr) + ingest_type, + fulltext_url.get('technique'), + next_url, + ), + file=sys.stderr) if next_url in hops: result['status'] = 'link-loop' result['error_message'] = "repeated: {}".format(next_url) return result hops.append(next_url) continue - elif ingest_type in ("xml", "html", "component") and html_ish_resource and html_biblio: + elif ingest_type in ("xml", "html", + "component") and html_ish_resource and html_biblio: # NOTE: src_fulltext_url is not a thing next_url_found = None if ingest_type == "xml" and html_biblio.xml_fulltext_url: @@ -711,11 +727,11 @@ class IngestFileWorker(SandcrawlerWorker): next_url = next_url_found technique = "html_biblio" print("[PARSE {:>6}] {} {}".format( - ingest_type, - technique, - next_url, - ), - file=sys.stderr) + ingest_type, + technique, + next_url, + ), + file=sys.stderr) if next_url in hops: if ingest_type == "html": # for HTML ingest, we don't count this as a link-loop @@ -756,7 +772,8 @@ class IngestFileWorker(SandcrawlerWorker): result['status'] = "wrong-mimetype" # formerly: "other-mimetype" return result elif ingest_type == "xml": - if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"): + if file_meta['mimetype'] not in ("application/xml", "text/xml", + "application/jats+xml"): result['status'] = "wrong-mimetype" return result elif ingest_type == "html": @@ -786,18 +803,18 @@ class IngestFileWorker(SandcrawlerWorker): result['hit'] = True if ingest_type == "pdf": print("[SUCCESS {:>5}] sha1:{} grobid:{} pdfextract:{}".format( - ingest_type, - result.get('file_meta', {}).get('sha1hex'), - result.get('grobid', {}).get('status_code'), - result.get('pdf_meta', {}).get('status'), - ), - file=sys.stderr) + ingest_type, + result.get('file_meta', {}).get('sha1hex'), + result.get('grobid', {}).get('status_code'), + result.get('pdf_meta', {}).get('status'), + ), + file=sys.stderr) else: print("[SUCCESS {:>5}] sha1:{}".format( - ingest_type, - result.get('file_meta', {}).get('sha1hex'), - ), - file=sys.stderr) + ingest_type, + result.get('file_meta', {}).get('sha1hex'), + ), + file=sys.stderr) return result diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py index 11386df..5cbb908 100644 --- a/python/sandcrawler/ingest_fileset.py +++ b/python/sandcrawler/ingest_fileset.py @@ -1,4 +1,3 @@ - import gzip import json import sys @@ -14,17 +13,21 @@ from sandcrawler.fileset_platforms import DATASET_PLATFORM_HELPER_TABLE, Fileset from sandcrawler.fileset_strategies import FILESET_STRATEGY_HELPER_TABLE, FilesetIngestStrategy from sandcrawler.fileset_types import PlatformRestrictedError, PlatformScopeError from sandcrawler.html import extract_fulltext_url -from sandcrawler.html_metadata import BiblioMetadata, html_extract_biblio, html_extract_resources, load_adblock_rules -from sandcrawler.ia import (CdxApiClient, CdxApiError, NoCaptureError, PetaboxError, ResourceResult, SavePageNowClient, - SavePageNowError, WaybackClient, WaybackContentError, WaybackError, cdx_to_dict, +from sandcrawler.html_metadata import (BiblioMetadata, html_extract_biblio, + html_extract_resources, load_adblock_rules) +from sandcrawler.ia import (CdxApiClient, CdxApiError, NoCaptureError, PetaboxError, + ResourceResult, SavePageNowClient, SavePageNowError, WaybackClient, + WaybackContentError, WaybackError, cdx_to_dict, fix_transfer_encoding) from sandcrawler.ingest_file import IngestFileWorker -from sandcrawler.ingest_html import (WebResource, fetch_html_resources, html_extract_body_teixml, html_guess_platform, +from sandcrawler.ingest_html import (WebResource, fetch_html_resources, + html_extract_body_teixml, html_guess_platform, html_guess_scope, quick_fetch_html_resources) from sandcrawler.misc import clean_url, gen_file_metadata, parse_cdx_datetime from sandcrawler.workers import SandcrawlerWorker -MAX_BODY_SIZE_BYTES = 128*1024*1024 +MAX_BODY_SIZE_BYTES = 128 * 1024 * 1024 + class IngestFilesetWorker(IngestFileWorker): """ @@ -39,14 +42,13 @@ class IngestFilesetWorker(IngestFileWorker): checking to see if content has been archived already) 4. summarize status """ - def __init__(self, sink=None, **kwargs): super().__init__(sink=None, **kwargs) self.sink = sink self.dataset_platform_helpers = DATASET_PLATFORM_HELPER_TABLE self.dataset_strategy_archivers = FILESET_STRATEGY_HELPER_TABLE - self.max_total_size = kwargs.get('max_total_size', 64*1024*1024*1024) + self.max_total_size = kwargs.get('max_total_size', 64 * 1024 * 1024 * 1024) self.max_file_count = kwargs.get('max_file_count', 200) self.ingest_file_result_sink = kwargs.get('ingest_file_result_sink') self.ingest_file_result_stdout = kwargs.get('ingest_file_result_stdout', False) @@ -72,11 +74,12 @@ class IngestFilesetWorker(IngestFileWorker): raise NotImplementedError("process_existing() not tested or safe yet") def want(self, request: dict) -> bool: - if not request.get('ingest_type') in ('dataset',): + if not request.get('ingest_type') in ('dataset', ): return False return True - def fetch_resource_iteratively(self, ingest_type: str, base_url: str, force_recrawl: bool) -> dict: + def fetch_resource_iteratively(self, ingest_type: str, base_url: str, + force_recrawl: bool) -> dict: """ This is copypasta from process_file(), should probably refactor. """ @@ -174,10 +177,9 @@ class IngestFilesetWorker(IngestFileWorker): # here we split based on ingest type to try and extract a next hop html_ish_resource = bool( "html" in file_meta['mimetype'] - or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml" + or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml" or "application/xml" in file_meta['mimetype'] - or "text/xml" in file_meta['mimetype'] - ) + or "text/xml" in file_meta['mimetype']) html_biblio = None html_doc = None if html_ish_resource and resource.body: @@ -186,7 +188,8 @@ class IngestFilesetWorker(IngestFileWorker): html_biblio = html_extract_biblio(resource.terminal_url, html_doc) if html_biblio: if not 'html_biblio' in result or html_biblio.title: - result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True)) + result['html_biblio'] = json.loads( + html_biblio.json(exclude_none=True)) #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr) except ValueError: pass @@ -214,7 +217,8 @@ class IngestFilesetWorker(IngestFileWorker): result['status'] = "wrong-mimetype" # formerly: "other-mimetype" return result elif ingest_type == "xml": - if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"): + if file_meta['mimetype'] not in ("application/xml", "text/xml", + "application/jats+xml"): result['status'] = "wrong-mimetype" return result elif ingest_type == "html": @@ -229,11 +233,10 @@ class IngestFilesetWorker(IngestFileWorker): result['_resource'] = resource return result - def process(self, request: dict, key: Any = None) -> dict: ingest_type = request.get('ingest_type') - if ingest_type not in ("dataset",): + if ingest_type not in ("dataset", ): raise NotImplementedError(f"can't handle ingest_type={ingest_type}") # parse/clean URL @@ -250,7 +253,9 @@ class IngestFilesetWorker(IngestFileWorker): #if existing: # return self.process_existing(request, existing) - result = self.fetch_resource_iteratively(ingest_type, base_url, force_recrawl=force_recrawl) + result = self.fetch_resource_iteratively(ingest_type, + base_url, + force_recrawl=force_recrawl) result['request'] = request if result.get('status') != None: result['request'] = request @@ -323,14 +328,16 @@ class IngestFilesetWorker(IngestFileWorker): return result if result['file_count'] > self.max_file_count: # hard max, to prevent downstream breakage - if result['file_count'] > 10*1000: + if result['file_count'] > 10 * 1000: result['manifest'] = result['manifest'][:self.max_file_count] result['status'] = 'too-many-files' return result ingest_strategy = platform_helper.chose_strategy(dataset_meta) result['ingest_strategy'] = ingest_strategy - print(f"[PLATFORM {platform}] id={dataset_meta.platform_id} file_count={result['file_count']} total_size={result['total_size']} strategy={ingest_strategy}", file=sys.stderr) + print( + f"[PLATFORM {platform}] id={dataset_meta.platform_id} file_count={result['file_count']} total_size={result['total_size']} strategy={ingest_strategy}", + file=sys.stderr) strategy_helper = self.dataset_strategy_archivers.get(ingest_strategy) if not strategy_helper: @@ -349,7 +356,8 @@ class IngestFilesetWorker(IngestFileWorker): if archive_result.bundle_file_meta: result['fileset_bundle']['file_meta'] = archive_result.bundle_file_meta if archive_result.archiveorg_bundle_path: - result['fileset_bundle']['archiveorg_bundle_path'] = archive_result.archiveorg_bundle_path + result['fileset_bundle'][ + 'archiveorg_bundle_path'] = archive_result.archiveorg_bundle_path if archive_result.bundle_resource: result['fileset_bundle']['terminal'] = dict( terminal_url=archive_result.bundle_resource.terminal_url, @@ -357,14 +365,16 @@ class IngestFilesetWorker(IngestFileWorker): terminal_status_code=archive_result.bundle_resource.terminal_status_code, ) if archive_result.bundle_resource.cdx: - result['fileset_bundle']['cdx'] = cdx_to_dict(archive_result.bundle_resource.cdx) + result['fileset_bundle']['cdx'] = cdx_to_dict( + archive_result.bundle_resource.cdx) if archive_result.bundle_resource.revisit_cdx: - result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(archive_result.bundle_resource.revisit_cdx) + result['fileset_bundle']['revisit_cdx'] = cdx_to_dict( + archive_result.bundle_resource.revisit_cdx) if ingest_strategy.endswith('-file'): result['fileset_file'] = dict() if archive_result.file_file_meta: - result['fileset_file']['file_meta'] = file_meta=archive_result.file_file_meta, + result['fileset_file']['file_meta'] = file_meta = archive_result.file_file_meta, if archive_result.file_resource: result['fileset_file']['terminal'] = dict( terminal_url=archive_result.file_resource.terminal_url, @@ -372,16 +382,20 @@ class IngestFilesetWorker(IngestFileWorker): terminal_status_code=archive_result.file_resource.terminal_status_code, ) if archive_result.file_resource.cdx: - result['fileset_file']['cdx'] = cdx_to_dict(archive_result.file_resource.cdx) + result['fileset_file']['cdx'] = cdx_to_dict( + archive_result.file_resource.cdx) if archive_result.file_resource.revisit_cdx: - result['fileset_file']['revisit_cdx'] = cdx_to_dict(archive_result.file_resource.revisit_cdx) + result['fileset_file']['revisit_cdx'] = cdx_to_dict( + archive_result.file_resource.revisit_cdx) if result['status'].startswith('success'): # check that these are still valid assert result['file_count'] == len(archive_result.manifest) - assert result['total_size'] == sum([m.size for m in archive_result.manifest if m.size]) + assert result['total_size'] == sum( + [m.size for m in archive_result.manifest if m.size]) - if result['status'] == 'success-file' and archive_result.file_resource and archive_result.file_file_meta: + if result[ + 'status'] == 'success-file' and archive_result.file_resource and archive_result.file_file_meta: file_result = dict( hit=True, status='success', @@ -397,10 +411,13 @@ class IngestFilesetWorker(IngestFileWorker): if archive_result.file_resource.cdx: file_result['cdx'] = cdx_to_dict(archive_result.file_resource.cdx) if archive_result.file_resource.revisit_cdx: - file_result['revisit_cdx'] = cdx_to_dict(archive_result.file_resource.revisit_cdx) + file_result['revisit_cdx'] = cdx_to_dict( + archive_result.file_resource.revisit_cdx) file_result['request']['ingest_type'] = request['ingest_type'] + "-file" # call the super() (ingest_file) version of process_hit() - info = self.process_file_hit(file_result['request']['ingest_type'], archive_result.file_resource, archive_result.file_file_meta) + info = self.process_file_hit(file_result['request']['ingest_type'], + archive_result.file_resource, + archive_result.file_file_meta) file_result.update(info) if self.ingest_file_result_sink: self.ingest_file_result_sink.push_record(result.copy()) @@ -410,17 +427,19 @@ class IngestFilesetWorker(IngestFileWorker): if result['status'].startswith('success'): result['hit'] = True print("[SUCCESS {:>5}] file_count={} total_size={} strategy={}".format( - ingest_type, - result['file_count'], - result['total_size'], - ingest_strategy, - ), file=sys.stderr) + ingest_type, + result['file_count'], + result['total_size'], + ingest_strategy, + ), + file=sys.stderr) else: print("[FAIL {:>5}] status={} file_count={} total_size={} strategy={}".format( - ingest_type, - result['status'], - result['file_count'], - result['total_size'], - ingest_strategy, - ), file=sys.stderr) + ingest_type, + result['status'], + result['file_count'], + result['total_size'], + ingest_strategy, + ), + file=sys.stderr) return result diff --git a/python/sandcrawler/ingest_html.py b/python/sandcrawler/ingest_html.py index 9c72dd5..bf25d5d 100644 --- a/python/sandcrawler/ingest_html.py +++ b/python/sandcrawler/ingest_html.py @@ -1,4 +1,3 @@ - import argparse import datetime import io @@ -11,16 +10,20 @@ import pydantic import trafilatura from selectolax.parser import HTMLParser -from sandcrawler.html_metadata import BiblioMetadata, html_extract_biblio, html_extract_resources, load_adblock_rules -from sandcrawler.ia import (CdxApiClient, NoCaptureError, ResourceResult, WaybackClient, WaybackContentError, - cdx_to_dict, fix_transfer_encoding) -from sandcrawler.misc import clean_url, datetime_to_cdx, gen_file_metadata, parse_cdx_datetime, url_fuzzy_equal +from sandcrawler.html_metadata import (BiblioMetadata, html_extract_biblio, + html_extract_resources, load_adblock_rules) +from sandcrawler.ia import (CdxApiClient, NoCaptureError, ResourceResult, WaybackClient, + WaybackContentError, cdx_to_dict, fix_transfer_encoding) +from sandcrawler.misc import (clean_url, datetime_to_cdx, gen_file_metadata, parse_cdx_datetime, + url_fuzzy_equal) TRAFILATURA_AGENT = f"trafilatura/{trafilatura.__version__}" + def html_extract_body_teixml(doc: bytes) -> dict: try: - tei_xml = trafilatura.extract(doc, + tei_xml = trafilatura.extract( + doc, output_format='xmltei', include_comments=False, include_formatting=True, @@ -33,13 +36,19 @@ def html_extract_body_teixml(doc: bytes) -> dict: if tei_xml: body_txt = teixml_body_text(tei_xml) word_count = len(body_txt.split()) - return dict(status="success", agent=TRAFILATURA_AGENT, tei_xml=tei_xml, word_count=word_count) - elif doc.startswith(b''): + return dict(status="success", + agent=TRAFILATURA_AGENT, + tei_xml=tei_xml, + word_count=word_count) + elif doc.startswith( + b'' + ): # hack for firstmonday.org return html_extract_body_teixml(doc[106:]) else: return dict(status="empty-xml", agent=TRAFILATURA_AGENT) + def teixml_body_text(doc_xml: str) -> str: ns = {"tei": "http://www.tei-c.org/ns/1.0"} tree = ET.fromstring(doc_xml) @@ -49,6 +58,7 @@ def teixml_body_text(doc_xml: str) -> str: else: return "" + class WebResource(pydantic.BaseModel): surt: str timestamp: datetime.datetime @@ -61,16 +71,15 @@ class WebResource(pydantic.BaseModel): resource_type: Optional[str] class Config: - json_encoders = { - datetime.datetime: lambda dt: dt.isoformat() - } + json_encoders = {datetime.datetime: lambda dt: dt.isoformat()} + class IngestWebResult(pydantic.BaseModel): status: str hit: bool error_message: Optional[str] cdx: Optional[dict] - terminal: Optional[Any] # TODO + terminal: Optional[Any] # TODO request: Optional[Any] # TODO file_meta: Optional[dict] html_biblio: Optional[BiblioMetadata] @@ -84,6 +93,7 @@ class IngestWebResult(pydantic.BaseModel): datetime.datetime: lambda dt: dt.isoformat(), } + class HtmlMetaRow(pydantic.BaseModel): sha1hex: str status: str @@ -106,7 +116,7 @@ class HtmlMetaRow(pydantic.BaseModel): """ return ( self.sha1hex, - datetime.datetime.now(), # updated + datetime.datetime.now(), # updated self.status, self.scope, self.has_teixml, @@ -117,7 +127,8 @@ class HtmlMetaRow(pydantic.BaseModel): ) -def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient, when: Optional[datetime.datetime]) -> List[WebResource]: +def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient, + when: Optional[datetime.datetime]) -> List[WebResource]: """ This is the lazy version that just does a CDX lookup for each resource. @@ -132,27 +143,30 @@ def quick_fetch_html_resources(resources: List[dict], cdx_client: CdxApiClient, if not cdx_row: raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}") if cdx_row.url != resource['url'] and not url_fuzzy_equal(cdx_row.url, resource['url']): - print(f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}", file=sys.stderr) + print(f" WARN: CDX fuzzy match: {cdx_row.url} != {resource['url']}", + file=sys.stderr) if not cdx_row.status_code: # TODO: fall back to a full fetch? print(f" WARN: skipping revisit record", file=sys.stderr) continue - full.append(WebResource( - surt=cdx_row.surt, - timestamp=cdx_row.datetime, - url=cdx_row.url, - sha1hex=cdx_row.sha1hex, - mimetype=cdx_row.mimetype, - status_code=cdx_row.status_code, - size=None, - sha256hex=None, - resource_type=resource['type'], - )) + full.append( + WebResource( + surt=cdx_row.surt, + timestamp=cdx_row.datetime, + url=cdx_row.url, + sha1hex=cdx_row.sha1hex, + mimetype=cdx_row.mimetype, + status_code=cdx_row.status_code, + size=None, + sha256hex=None, + resource_type=resource['type'], + )) return full -def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient, when: Optional[datetime.datetime]) -> List[WebResource]: +def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient, + when: Optional[datetime.datetime]) -> List[WebResource]: """ This is the full version which fetches each resource from wayback/petabox and calculates additional hashes. @@ -168,23 +182,28 @@ def fetch_html_resources(resources: List[dict], wayback_client: WaybackClient, w raise NoCaptureError(f"HTML sub-resource not found: {resource['url']}") file_meta = gen_file_metadata(wayback_resp.body, allow_empty=True) if file_meta['sha1hex'] != wayback_resp.cdx.sha1hex: - raise WaybackContentError(f"wayback payload sha1hex mismatch: {wayback_resp.cdx.datetime} {wayback_resp.cdx.url}") - full.append(WebResource( - surt=wayback_resp.cdx.surt, - timestamp=parse_cdx_datetime(wayback_resp.cdx.datetime), - url=wayback_resp.cdx.url, - sha1hex=file_meta['sha1hex'], - mimetype=file_meta['mimetype'], - status_code=wayback_resp.cdx.status_code or wayback_resp.revisit_cdx.status_code, - size=file_meta['size_bytes'], - sha256hex=file_meta['sha256hex'], - resource_type=resource['type'], - )) + raise WaybackContentError( + f"wayback payload sha1hex mismatch: {wayback_resp.cdx.datetime} {wayback_resp.cdx.url}" + ) + full.append( + WebResource( + surt=wayback_resp.cdx.surt, + timestamp=parse_cdx_datetime(wayback_resp.cdx.datetime), + url=wayback_resp.cdx.url, + sha1hex=file_meta['sha1hex'], + mimetype=file_meta['mimetype'], + status_code=wayback_resp.cdx.status_code + or wayback_resp.revisit_cdx.status_code, + size=file_meta['size_bytes'], + sha256hex=file_meta['sha256hex'], + resource_type=resource['type'], + )) return full -def html_guess_platform(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata]) -> Optional[str]: +def html_guess_platform(url: str, doc: HTMLParser, + biblio: Optional[BiblioMetadata]) -> Optional[str]: generator: Optional[str] = None generator_elem = doc.css_first("meta[name='generator']") @@ -229,7 +248,9 @@ def html_guess_platform(url: str, doc: HTMLParser, biblio: Optional[BiblioMetada return None -def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata], word_count: Optional[int]) -> str: + +def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata], + word_count: Optional[int]) -> str: """ This function tries to guess if an HTML document represents one of: @@ -328,7 +349,9 @@ def html_guess_scope(url: str, doc: HTMLParser, biblio: Optional[BiblioMetadata] return "unknown" -def run_single(url: str, timestamp: Optional[str] = None, quick_mode: bool = False) -> IngestWebResult: +def run_single(url: str, + timestamp: Optional[str] = None, + quick_mode: bool = False) -> IngestWebResult: adblock = load_adblock_rules() wayback_client = WaybackClient() @@ -375,7 +398,8 @@ def run_single(url: str, timestamp: Optional[str] = None, quick_mode: bool = Fal full_resources: List[WebResource] = [] if quick_mode: - full_resources = quick_fetch_html_resources(raw_resources, wayback_client.cdx_client, when) + full_resources = quick_fetch_html_resources(raw_resources, wayback_client.cdx_client, + when) else: full_resources = fetch_html_resources(raw_resources, wayback_client, when) @@ -399,14 +423,11 @@ def main() -> None: python -m sandcrawler.ingest_html """ - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) subparsers = parser.add_subparsers() - sub = subparsers.add_parser( - "single", help="tries to ingest a single URL, dumps result to stdout" - ) + sub = subparsers.add_parser("single", + help="tries to ingest a single URL, dumps result to stdout") sub.set_defaults(func="run_single") sub.add_argument( "url", @@ -437,5 +458,6 @@ def main() -> None: #func() raise NotImplementedError() + if __name__ == "__main__": main() diff --git a/python/sandcrawler/minio.py b/python/sandcrawler/minio.py index b617178..188621f 100644 --- a/python/sandcrawler/minio.py +++ b/python/sandcrawler/minio.py @@ -1,4 +1,3 @@ - import hashlib import io import os @@ -7,7 +6,6 @@ import minio class SandcrawlerMinioClient(object): - def __init__(self, host_url, access_key, secret_key, default_bucket=None): """ host is minio connection string (host:port) diff --git a/python/sandcrawler/misc.py b/python/sandcrawler/misc.py index cf8c4bd..ddbd95a 100644 --- a/python/sandcrawler/misc.py +++ b/python/sandcrawler/misc.py @@ -1,4 +1,3 @@ - import base64 import datetime import hashlib @@ -19,22 +18,28 @@ def clean_url(s: str) -> str: parsed.colon_before_port = b'' return str(urlcanon.whatwg(parsed)) + def url_fuzzy_equal(left: str, right: str) -> bool: """ TODO: use proper surt library and canonicalization for this check """ - fuzzy_left = '://'.join(clean_url(left).replace('www.', '').replace(':80/', '/').split('://')[1:]) - fuzzy_right = '://'.join(clean_url(right).replace('www.', '').replace(':80/', '/').split('://')[1:]) + fuzzy_left = '://'.join( + clean_url(left).replace('www.', '').replace(':80/', '/').split('://')[1:]) + fuzzy_right = '://'.join( + clean_url(right).replace('www.', '').replace(':80/', '/').split('://')[1:]) if fuzzy_left == fuzzy_right: return True elif fuzzy_left == fuzzy_right + "/" or fuzzy_right == fuzzy_left + "/": return True return False + def test_url_fuzzy_equal() -> None: assert True == url_fuzzy_equal( "http://www.annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree", - "http://annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree") + "http://annalsofian.org/article.asp?issn=0972-2327;year=2014;volume=17;issue=4;spage=463;epage=465;aulast=Nithyashree" + ) + def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict: """ @@ -45,10 +50,10 @@ def gen_file_metadata(blob: bytes, allow_empty: bool = False) -> dict: assert blob is not None if not allow_empty: assert blob - if len(blob) < 1024*1024: + if len(blob) < 1024 * 1024: mimetype = magic.Magic(mime=True).from_buffer(blob) else: - mimetype = magic.Magic(mime=True).from_buffer(blob[:(1024*1024)]) + mimetype = magic.Magic(mime=True).from_buffer(blob[:(1024 * 1024)]) if mimetype in ("application/xml", "text/xml"): # crude checks for XHTML or JATS XML, using only first 1 kB of file if b" dict: mimetype=mimetype, ) + def gen_file_metadata_path(path: str, allow_empty: bool = False) -> dict: """ Variant of gen_file_metadata() which works with files on local disk @@ -92,7 +98,7 @@ def gen_file_metadata_path(path: str, allow_empty: bool = False) -> dict: size_bytes = 0 with open(path, 'rb') as f: while True: - chunk = f.read(1024*1024) + chunk = f.read(1024 * 1024) if not chunk: break size_bytes += len(chunk) @@ -108,6 +114,7 @@ def gen_file_metadata_path(path: str, allow_empty: bool = False) -> dict: mimetype=mimetype, ) + def b32_hex(s: str) -> str: """ Converts a base32-encoded SHA-1 checksum into hex-encoded @@ -123,6 +130,7 @@ def b32_hex(s: str) -> str: raise ValueError("not a base-32 encoded SHA-1 hash: {}".format(s)) return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') + NORMAL_MIME = ( 'application/pdf', 'application/postscript', @@ -131,6 +139,7 @@ NORMAL_MIME = ( 'application/octet-stream', ) + def normalize_mime(raw: str) -> Optional[str]: raw = raw.lower().strip() for norm in NORMAL_MIME: @@ -142,9 +151,7 @@ def normalize_mime(raw: str) -> Optional[str]: return 'text/xml' if raw.startswith('application/x-pdf'): return 'application/pdf' - if raw in ( - '.pdf', - ): + if raw in ('.pdf', ): return 'application/pdf' if raw in ( 'application/download', @@ -154,7 +161,7 @@ def normalize_mime(raw: str) -> Optional[str]: 'application/octetstream', 'application/force-download', 'application/unknown', - ): + ): return 'application/octet-stream' return None @@ -193,8 +200,8 @@ def parse_cdx_line(raw_cdx: str, normalize=True) -> Optional[dict]: offset = cdx[9] warc = cdx[10] - if not (sha1b32.isalnum() and c_size.isdigit() and offset.isdigit() - and len(sha1b32) == 32 and dt.isdigit()): + if not (sha1b32.isalnum() and c_size.isdigit() and offset.isdigit() and len(sha1b32) == 32 + and dt.isdigit()): return None if '-' in (surt, dt, url, http_status, sha1b32, c_size, offset, warc): @@ -221,6 +228,7 @@ def parse_cdx_line(raw_cdx: str, normalize=True) -> Optional[dict]: warc_path=warc, ) + def parse_cdx_datetime(dt_str: str) -> Optional[datetime.datetime]: if not dt_str: return None @@ -229,23 +237,39 @@ def parse_cdx_datetime(dt_str: str) -> Optional[datetime.datetime]: except Exception: return None + def test_parse_cdx_datetime() -> None: assert parse_cdx_datetime("") == None assert parse_cdx_datetime("asdf") == None assert parse_cdx_datetime("19930203123045") != None - assert parse_cdx_datetime("20201028235103") == datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3) + assert parse_cdx_datetime("20201028235103") == datetime.datetime(year=2020, + month=10, + day=28, + hour=23, + minute=51, + second=3) + def datetime_to_cdx(dt: datetime.datetime) -> str: return '%04d%02d%02d%02d%02d%02d' % ( - dt.year, dt.month, dt.day, - dt.hour, dt.minute, dt.second, + dt.year, + dt.month, + dt.day, + dt.hour, + dt.minute, + dt.second, ) + def test_datetime_to_cdx() -> None: - assert "20201028235103" == datetime_to_cdx(datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3)) + assert "20201028235103" == datetime_to_cdx( + datetime.datetime(year=2020, month=10, day=28, hour=23, minute=51, second=3)) + -def requests_retry_session(retries=10, backoff_factor=3, - status_forcelist=(500, 502, 504), session=None) -> requests.Session: +def requests_retry_session(retries=10, + backoff_factor=3, + status_forcelist=(500, 502, 504), + session=None) -> requests.Session: """ From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests """ @@ -262,6 +286,7 @@ def requests_retry_session(retries=10, backoff_factor=3, session.mount('https://', adapter) return session + def sanitize_fs_path(path: str) -> str: """ From: https://stackoverflow.com/questions/13939120/sanitizing-a-file-path-in-python/66950540#66950540 @@ -271,6 +296,7 @@ def sanitize_fs_path(path: str) -> str: # - making the path relative return os.path.relpath(os.path.normpath(os.path.join("/", path)), "/") + def test_sanitize_fs_path() -> None: assert sanitize_fs_path("/thing.png") == "thing.png" assert sanitize_fs_path("../../thing.png") == "thing.png" diff --git a/python/sandcrawler/pdfextract.py b/python/sandcrawler/pdfextract.py index 2fb34b8..190672d 100644 --- a/python/sandcrawler/pdfextract.py +++ b/python/sandcrawler/pdfextract.py @@ -1,4 +1,3 @@ - import datetime import json import sys @@ -153,19 +152,20 @@ BAD_PDF_SHA1HEX = [ "fd9bd560662e070b222d63052830837829c490f0", ] + @dataclass class PdfExtractResult: sha1hex: str status: str error_msg: Optional[str] = None - file_meta: Optional[Dict[str,Any]] = None + file_meta: Optional[Dict[str, Any]] = None text: Optional[str] = None page0_thumbnail: Optional[bytes] = None has_page0_thumbnail: bool = False meta_xml: Optional[str] = None - pdf_info: Optional[Dict[str,Any]] = None - pdf_extra: Optional[Dict[str,Any]] = None - source: Optional[Dict[str,Any]] = None + pdf_info: Optional[Dict[str, Any]] = None + pdf_extra: Optional[Dict[str, Any]] = None + source: Optional[Dict[str, Any]] = None def to_pdftext_dict(self) -> dict: """ @@ -221,7 +221,8 @@ class PdfExtractResult: ) else: pdf_extra = dict() - for k in ('page_count', 'page0_height', 'page0_width', 'permanent_id', 'pdf_version'): + for k in ('page_count', 'page0_height', 'page0_width', 'permanent_id', + 'pdf_version'): if record.get(k): pdf_extra[k] = record[k] return PdfExtractResult( @@ -255,7 +256,7 @@ class PdfExtractResult: metadata_json = json.dumps(metadata, sort_keys=True) return ( self.sha1hex, - datetime.datetime.now(), # updated + datetime.datetime.now(), # updated self.status, self.has_page0_thumbnail, pdf_extra.get('page_count'), @@ -269,7 +270,7 @@ class PdfExtractResult: ) -def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtractResult: +def process_pdf(blob: bytes, thumb_size=(180, 300), thumb_type="JPEG") -> PdfExtractResult: """ A known issue is that output text is in "physical layout" mode, which means columns will be side-by-side. We would prefer a single stream of tokens! @@ -330,7 +331,8 @@ def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtr renderer = poppler.PageRenderer() try: full_img = renderer.render_page(page0) - img = Image.frombuffer("RGBA", (full_img.width, full_img.height), full_img.data, 'raw', "BGRA", 0, 1) + img = Image.frombuffer("RGBA", (full_img.width, full_img.height), full_img.data, 'raw', + "BGRA", 0, 1) img.thumbnail(thumb_size, Image.BICUBIC) buf = BytesIO() img.save(buf, thumb_type) @@ -356,14 +358,14 @@ def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtr ) # Kafka message size limit; cap at about 1 MByte - if len(full_text)> 1000000: + if len(full_text) > 1000000: return PdfExtractResult( sha1hex=sha1hex, status='text-too-large', error_msg="full_text chars: {}".format(len(full_text)), file_meta=file_meta, ) - if len(pdf.metadata)> 1000000: + if len(pdf.metadata) > 1000000: return PdfExtractResult( sha1hex=sha1hex, status='text-too-large', @@ -414,8 +416,8 @@ def process_pdf(blob: bytes, thumb_size=(180,300), thumb_type="JPEG") -> PdfExtr ), ) -class PdfExtractWorker(SandcrawlerFetchWorker): +class PdfExtractWorker(SandcrawlerFetchWorker): def __init__(self, wayback_client=None, sink=None, **kwargs): super().__init__(wayback_client=wayback_client) self.wayback_client = wayback_client @@ -445,12 +447,12 @@ class PdfExtractWorker(SandcrawlerFetchWorker): self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) return result.to_pdftext_dict() + class PdfExtractBlobWorker(SandcrawlerWorker): """ This is sort of like PdfExtractWorker, except it receives blobs directly, instead of fetching blobs from some remote store. """ - def __init__(self, sink=None, **kwargs): super().__init__() self.sink = sink @@ -466,4 +468,3 @@ class PdfExtractBlobWorker(SandcrawlerWorker): self.thumbnail_sink.push_record(result.page0_thumbnail, key=result.sha1hex) return result.to_pdftext_dict() - diff --git a/python/sandcrawler/pdftrio.py b/python/sandcrawler/pdftrio.py index 7d03357..e3d4a54 100644 --- a/python/sandcrawler/pdftrio.py +++ b/python/sandcrawler/pdftrio.py @@ -1,4 +1,3 @@ - import time import requests @@ -8,7 +7,6 @@ from .workers import SandcrawlerFetchWorker, SandcrawlerWorker class PdfTrioClient(object): - def __init__(self, host_url="http://pdftrio.qa.fatcat.wiki", **kwargs): self.host_url = host_url self.http_session = requests_retry_session(retries=3, backoff_factor=3) @@ -51,9 +49,7 @@ class PdfTrioClient(object): 'error_msg': 'pdftrio request connection timout', } - info = dict( - status_code=pdftrio_response.status_code, - ) + info = dict(status_code=pdftrio_response.status_code, ) if pdftrio_response.status_code == 200: resp_json = pdftrio_response.json() assert 'ensemble_score' in resp_json @@ -72,7 +68,6 @@ class PdfTrioWorker(SandcrawlerFetchWorker): """ This class is basically copied directly from GrobidWorker """ - def __init__(self, pdftrio_client, wayback_client=None, sink=None, **kwargs): super().__init__(wayback_client=wayback_client) self.pdftrio_client = pdftrio_client @@ -103,12 +98,12 @@ class PdfTrioWorker(SandcrawlerFetchWorker): result['timing']['fetch_sec'] = fetch_sec return result + class PdfTrioBlobWorker(SandcrawlerWorker): """ This is sort of like PdfTrioWorker, except it receives blobs directly, instead of fetching blobs from some remote store. """ - def __init__(self, pdftrio_client, sink=None, mode="auto", **kwargs): super().__init__() self.pdftrio_client = pdftrio_client @@ -128,4 +123,3 @@ class PdfTrioBlobWorker(SandcrawlerWorker): total_sec=time.time() - start_process, ) return result - diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 66a36bc..44c03f2 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -1,4 +1,3 @@ - """ cdx - read raw CDX, filter @@ -32,7 +31,6 @@ from sandcrawler.workers import SandcrawlerWorker class PersistCdxWorker(SandcrawlerWorker): - def __init__(self, db_url, **kwargs): super().__init__() self.db = SandcrawlerPostgresClient(db_url) @@ -56,8 +54,8 @@ class PersistCdxWorker(SandcrawlerWorker): self.db.commit() return [] -class PersistIngestFileResultWorker(SandcrawlerWorker): +class PersistIngestFileResultWorker(SandcrawlerWorker): def __init__(self, db_url, **kwargs): super().__init__() self.db = SandcrawlerPostgresClient(db_url) @@ -78,8 +76,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): # backwards compat hacks; transform request to look like current schema if raw.get('ingest_type') == 'file': raw['ingest_type'] = 'pdf' - if (not raw.get('link_source') - and raw.get('base_url') + if (not raw.get('link_source') and raw.get('base_url') and raw.get('ext_ids', {}).get('doi') and raw['base_url'] == "https://doi.org/{}".format(raw['ext_ids']['doi'])): # set link_source(_id) for old ingest requests @@ -119,7 +116,6 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): if not request['request']: request['request'] = None return request - def file_result_to_row(self, raw: dict) -> Optional[dict]: """ @@ -137,7 +133,8 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): ingest_type = raw['request'].get('ingest_type') if ingest_type == 'file': ingest_type = 'pdf' - if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset', 'dataset-file'): + if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset', + 'dataset-file'): self.counts['skip-ingest-type'] += 1 return None if raw['status'] in ("existing", ): @@ -153,7 +150,9 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): if terminal: result['terminal_url'] = terminal.get('terminal_url') or terminal.get('url') result['terminal_dt'] = terminal.get('terminal_dt') - result['terminal_status_code'] = terminal.get('terminal_status_code') or terminal.get('status_code') or terminal.get('http_code') + result['terminal_status_code'] = terminal.get( + 'terminal_status_code') or terminal.get('status_code') or terminal.get( + 'http_code') if result['terminal_status_code']: result['terminal_status_code'] = int(result['terminal_status_code']) result['terminal_sha1hex'] = terminal.get('terminal_sha1hex') @@ -215,9 +214,12 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): 'manifest': raw.get('manifest'), } if result.get('fileset_bundle'): - result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get('archiveorg_item_bundle_path') - result['web_bundle_url'] = result['fileset_bundle'].get('terminal', {}).get('terminal_url') - result['web_bundle_dt'] = result['fileset_bundle'].get('terminal', {}).get('terminal_dt') + result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get( + 'archiveorg_item_bundle_path') + result['web_bundle_url'] = result['fileset_bundle'].get('terminal', + {}).get('terminal_url') + result['web_bundle_dt'] = result['fileset_bundle'].get('terminal', + {}).get('terminal_dt') return result def push_batch(self, batch): @@ -243,7 +245,9 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): # these schemas match, so can just pass through cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx')] - revisit_cdx_batch = [r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx')] + revisit_cdx_batch = [ + r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx') + ] cdx_batch.extend(revisit_cdx_batch) # filter to full CDX lines, with full warc_paths (not liveweb) cdx_batch = [r for r in cdx_batch if r.get('warc_path') and ("/" in r['warc_path'])] @@ -258,24 +262,31 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): self.counts['insert-file_meta'] += resp[0] self.counts['update-file_meta'] += resp[1] - html_meta_batch = [self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body')] + html_meta_batch = [ + self.result_to_html_meta(r) for r in batch if r.get('hit') and r.get('html_body') + ] if html_meta_batch: resp = self.db.insert_html_meta(self.cur, html_meta_batch, on_conflict="update") self.counts['insert-html_meta'] += resp[0] self.counts['update-html_meta'] += resp[1] - fileset_platform_batch = [self.result_to_platform_row(raw) for raw in batch if raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name')] + fileset_platform_batch = [ + self.result_to_platform_row(raw) for raw in batch if + raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name') + ] fileset_platform_batch = [p for p in fileset_platform_batch if p] if fileset_platform_batch: - resp = self.db.insert_ingest_fileset_platform(self.cur, fileset_platform_batch, on_conflict="update") + resp = self.db.insert_ingest_fileset_platform(self.cur, + fileset_platform_batch, + on_conflict="update") self.counts['insert-fileset_platform'] += resp[0] self.counts['update-fileset_platform'] += resp[1] self.db.commit() return [] -class PersistIngestFilesetWorker(SandcrawlerWorker): +class PersistIngestFilesetWorker(SandcrawlerWorker): def __init__(self, db_url, **kwargs): super().__init__() self.db = SandcrawlerPostgresClient(db_url) @@ -287,8 +298,8 @@ class PersistIngestFilesetWorker(SandcrawlerWorker): """ raise NotImplementedError -class PersistIngestRequestWorker(PersistIngestFileResultWorker): +class PersistIngestRequestWorker(PersistIngestFileResultWorker): def __init__(self, db_url, **kwargs): super().__init__(db_url=db_url) @@ -315,8 +326,8 @@ class PersistIngestRequestWorker(PersistIngestFileResultWorker): self.db.commit() return [] -class PersistGrobidWorker(SandcrawlerWorker): +class PersistGrobidWorker(SandcrawlerWorker): def __init__(self, db_url, **kwargs): super().__init__() self.grobid = GrobidClient() @@ -406,7 +417,6 @@ class PersistGrobidDiskWorker(SandcrawlerWorker): This could be refactored into a "Sink" type with an even thinner wrapper. """ - def __init__(self, output_dir): super().__init__() self.output_dir = output_dir @@ -424,7 +434,7 @@ class PersistGrobidDiskWorker(SandcrawlerWorker): if record.get('status_code') != 200 or not record.get('tei_xml'): return False - assert(len(record['key'])) == 40 + assert (len(record['key'])) == 40 p = "{}/{}".format(self.output_dir, self._blob_path(record['key'])) os.makedirs(os.path.dirname(p), exist_ok=True) with open(p, 'w') as f: @@ -434,7 +444,6 @@ class PersistGrobidDiskWorker(SandcrawlerWorker): class PersistPdfTrioWorker(SandcrawlerWorker): - def __init__(self, db_url, **kwargs): super().__init__() self.db = SandcrawlerPostgresClient(db_url) @@ -458,7 +467,10 @@ class PersistPdfTrioWorker(SandcrawlerWorker): self.counts['insert-pdftrio'] += resp[0] self.counts['update-pdftrio'] += resp[1] - file_meta_batch = [r['file_meta'] for r in batch if r['pdf_trio']['status'] == "success" and r.get('file_meta')] + file_meta_batch = [ + r['file_meta'] for r in batch + if r['pdf_trio']['status'] == "success" and r.get('file_meta') + ] resp = self.db.insert_file_meta(self.cur, file_meta_batch) self.counts['insert-file-meta'] += resp[0] self.counts['update-file-meta'] += resp[1] @@ -473,7 +485,6 @@ class PersistPdfTextWorker(SandcrawlerWorker): Should keep batch sizes small. """ - def __init__(self, db_url, **kwargs): super().__init__() self.s3 = SandcrawlerMinioClient( @@ -545,7 +556,6 @@ class PersistThumbnailWorker(SandcrawlerWorker): This worker *must* be used with raw kakfa mode; thumbnails are *not* wrapped in JSON like most sandcrawler kafka messages. """ - def __init__(self, **kwargs): super().__init__() self.s3 = SandcrawlerMinioClient( @@ -583,7 +593,6 @@ class GenericPersistDocWorker(SandcrawlerWorker): Objects are assumed to be JSON-wrapped strings. """ - def __init__(self, **kwargs): super().__init__() self.s3 = SandcrawlerMinioClient( @@ -624,7 +633,6 @@ class PersistXmlDocWorker(GenericPersistDocWorker): Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to sandcrawler database (SQL). """ - def __init__(self, **kwargs): super().__init__(**kwargs) self.s3_extension = kwargs.get('s3_extension', ".jats.xml") @@ -637,7 +645,6 @@ class PersistHtmlTeiXmlWorker(GenericPersistDocWorker): Pushes TEI-XML file to blob store (S3/seaweed/minio). Does not talk to sandcrawler database (SQL). """ - def __init__(self, **kwargs): super().__init__(**kwargs) self.s3_extension = kwargs.get('s3_extension', ".tei.xml") diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index d8a4016..7135f4c 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -1,4 +1,3 @@ - import json import multiprocessing.pool import signal @@ -21,7 +20,6 @@ class SandcrawlerWorker(object): Usually these get "pushed" into by a RecordPusher. Output goes to another worker (pipeline-style), or defaults to stdout. """ - def __init__(self): self.counts = Counter() self.sink = None @@ -62,9 +60,9 @@ class SandcrawlerWorker(object): multithreading or if signal-based timeouts are used elsewhere in the same process. """ - def timeout_handler(signum, frame): raise TimeoutError("timeout processing record") + signal.signal(signal.SIGALRM, timeout_handler) resp = None signal.alarm(int(timeout)) @@ -72,7 +70,7 @@ class SandcrawlerWorker(object): resp = self.push_record(task, key=key) except TimeoutError: self.counts['timeout'] += 1 - resp = self.timeout_response(task) # pylint: disable=assignment-from-none + resp = self.timeout_response(task) # pylint: disable=assignment-from-none # TODO: what if it is this push_record() itself that is timing out? if resp and self.sink: self.sink.push_record(resp) @@ -113,7 +111,6 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): Wrapper of SandcrawlerWorker that adds a helper method to fetch blobs (eg, PDFs) from wayback, archive.org, or other sources. """ - def __init__(self, wayback_client, **kwargs): super().__init__(**kwargs) self.wayback_client = wayback_client @@ -178,7 +175,8 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): ) blob = resp.content else: - raise ValueError("not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed") + raise ValueError( + "not a CDX (wayback) or petabox (archive.org) dict; not sure how to proceed") if not blob: return dict( key=default_key, @@ -192,8 +190,8 @@ class SandcrawlerFetchWorker(SandcrawlerWorker): blob=blob, ) -class MultiprocessWrapper(SandcrawlerWorker): +class MultiprocessWrapper(SandcrawlerWorker): def __init__(self, worker, sink, jobs=None): self.counts = Counter() self.worker = worker @@ -226,21 +224,21 @@ class MultiprocessWrapper(SandcrawlerWorker): print("Multiprocessing: {}".format(self.counts), file=sys.stderr) return worker_counts + class BlackholeSink(SandcrawlerWorker): """ Dummy SandcrawlerWorker. That doesn't do or process anything. Useful for tests. """ - def push_record(self, task, key=None): return def push_batch(self, tasks): return -class KafkaSink(SandcrawlerWorker): +class KafkaSink(SandcrawlerWorker): def __init__(self, kafka_hosts, produce_topic, **kwargs): self.sink = None self.counts = Counter() @@ -249,13 +247,12 @@ class KafkaSink(SandcrawlerWorker): config = self.producer_config({ 'bootstrap.servers': kafka_hosts, - 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes + 'message.max.bytes': 30000000, # ~30 MBytes; broker is ~50 MBytes 'api.version.request': True, 'api.version.fallback.ms': 0, }) self.producer = Producer(config) - @staticmethod def _fail_fast(err, msg): if err is not None: @@ -270,7 +267,7 @@ class KafkaSink(SandcrawlerWorker): 'delivery.report.only.error': True, 'default.topic.config': { 'message.timeout.ms': 30000, - 'request.required.acks': -1, # all brokers must confirm + 'request.required.acks': -1, # all brokers must confirm } }) return config @@ -285,11 +282,7 @@ class KafkaSink(SandcrawlerWorker): msg = msg.encode('utf-8') assert type(msg) == bytes - self.producer.produce( - self.produce_topic, - msg, - key=key, - on_delivery=self._fail_fast) + self.producer.produce(self.produce_topic, msg, key=key, on_delivery=self._fail_fast) self.counts['produced'] += 1 # check for errors etc @@ -308,7 +301,6 @@ class KafkaCompressSink(KafkaSink): """ Variant of KafkaSink for large documents. Used for, eg, GROBID output. """ - def producer_config(self, kafka_config): config = kafka_config.copy() config.update({ @@ -319,7 +311,7 @@ class KafkaCompressSink(KafkaSink): 'delivery.report.only.error': True, 'default.topic.config': { 'message.timeout.ms': 30000, - 'request.required.acks': -1, # all brokers must confirm + 'request.required.acks': -1, # all brokers must confirm } }) return config @@ -330,7 +322,6 @@ class RecordPusher: Base class for different record sources to be pushed into workers. Pretty trivial interface, just wraps an importer and pushes records in to it. """ - def __init__(self, worker, **kwargs): self.counts = Counter() self.worker = worker @@ -348,7 +339,6 @@ class RecordPusher: class JsonLinePusher(RecordPusher): - def __init__(self, worker, json_file, **kwargs): self.counts = Counter() self.worker = worker @@ -387,7 +377,6 @@ class JsonLinePusher(RecordPusher): class CdxLinePusher(RecordPusher): - def __init__(self, worker, cdx_file, **kwargs): self.counts = Counter() self.worker = worker @@ -409,7 +398,8 @@ class CdxLinePusher(RecordPusher): if not record: self.counts['skip-parse'] += 1 continue - if self.filter_http_statuses and record['http_status'] not in self.filter_http_statuses: + if self.filter_http_statuses and record[ + 'http_status'] not in self.filter_http_statuses: self.counts['skip-http_status'] += 1 continue if self.filter_mimetypes and record['mimetype'] not in self.filter_mimetypes: @@ -434,7 +424,6 @@ class CdxLinePusher(RecordPusher): class ZipfilePusher(RecordPusher): - def __init__(self, worker, zipfile_path, **kwargs): self.counts = Counter() self.worker = worker @@ -472,8 +461,8 @@ class ZipfilePusher(RecordPusher): print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr) return self.counts -class KafkaJsonPusher(RecordPusher): +class KafkaJsonPusher(RecordPusher): def __init__(self, worker, kafka_hosts, consume_topic, group, **kwargs): self.counts = Counter() self.worker = worker @@ -499,12 +488,11 @@ class KafkaJsonPusher(RecordPusher): # case where there there is one update and thousands of creates; # update would be lingering in worker, and if worker crashed # never created. Not great. - batch = self.consumer.consume( - num_messages=self.batch_size, - timeout=self.poll_interval) + batch = self.consumer.consume(num_messages=self.batch_size, + timeout=self.poll_interval) print("... got {} kafka messages ({}sec poll interval)".format( - len(batch), self.poll_interval), - file=sys.stderr) + len(batch), self.poll_interval), + file=sys.stderr) if not batch: # TODO: could have some larger timeout here and # self.worker.finish() if it's been more than, eg, a couple @@ -541,7 +529,9 @@ class KafkaJsonPusher(RecordPusher): while not done: try: # use timeouts; don't want kafka itself to timeout - self.worker.push_record_timeout(record, key=msg.key(), timeout=self.process_timeout_sec) + self.worker.push_record_timeout(record, + key=msg.key(), + timeout=self.process_timeout_sec) break except SandcrawlerBackoffError as be: print("Backing off for 200 seconds: {}".format(be)) @@ -611,14 +601,14 @@ def make_kafka_consumer(hosts, consume_topic, group): for p in partitions: if p.error: raise KafkaException(p.error) - print("Kafka partitions rebalanced: {} / {}".format( - consumer, partitions), - file=sys.stderr) + print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions), + file=sys.stderr) consumer = Consumer(conf) # NOTE: it's actually important that topic_name *not* be bytes (UTF-8 # encoded) - consumer.subscribe([topic_name], + consumer.subscribe( + [topic_name], on_assign=on_rebalance, on_revoke=on_rebalance, ) diff --git a/python/sandcrawler/xml.py b/python/sandcrawler/xml.py index 7a0086d..83d53d4 100644 --- a/python/sandcrawler/xml.py +++ b/python/sandcrawler/xml.py @@ -1,4 +1,3 @@ - import xml.etree.ElementTree as ET -- cgit v1.2.3