From b3447503c0aa2e326ce1e46c993be28f907ec23b Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 4 Oct 2021 16:13:55 -0700 Subject: progress on dataset ingest --- python/sandcrawler/fileset_platforms.py | 171 +++++++++++++++++++++-- python/sandcrawler/fileset_strategies.py | 57 +++++++- python/sandcrawler/fileset_types.py | 1 + python/sandcrawler/ingest_fileset.py | 226 ++++++++++++++++--------------- 4 files changed, 333 insertions(+), 122 deletions(-) diff --git a/python/sandcrawler/fileset_platforms.py b/python/sandcrawler/fileset_platforms.py index 7aeacf2..5342a4e 100644 --- a/python/sandcrawler/fileset_platforms.py +++ b/python/sandcrawler/fileset_platforms.py @@ -6,34 +6,189 @@ import time from collections import namedtuple from typing import Optional, Tuple, Any, Dict, List +import internetarchive + from sandcrawler.html_metadata import BiblioMetadata from sandcrawler.ia import ResourceResult +from sandcrawler.fileset_types import * -class DatasetPlatformHelper(class): +class DatasetPlatformHelper(): - def __init__(): + def __init__(self): self.platform_name = 'unknown' - def match_request(request: dict , resource: ResourceResult, html_biblio: Optional[BiblioMetadata]) -> bool: + def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool: """ Does this request look like it matches this platform? """ - raise NotImplemented + raise NotImplementedError() - def get_item(request: dict, resource: ResourceResult, html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem: + def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem: """ Fetch platform-specific metadata for this request (eg, via API calls) """ - raise NotImplemented + raise NotImplementedError() + + def chose_strategy(self, DatasetPlatformItem) -> IngestStrategy: + raise NotImplementedError() class DataverseHelper(DatasetPlatformHelper): - def __init__(): + def __init__(self): self.platform_name = 'dataverse' + def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool: + return False + + def chose_strategy(self, DatasetPlatformItem) -> IngestStrategy: + raise NotImplementedError() + + class ArchiveOrgHelper(DatasetPlatformHelper): - def __init__(): + FORMAT_TO_MIMETYPE = { + 'BZIP': 'application/x-bzip', + 'BZIP2': 'application/x-bzip2', + 'ZIP': 'application/zip', + 'GZIP': 'application/gzip', + 'RAR': 'application/vnd.rar', + 'TAR': 'application/x-tar', + '7z': 'application/x-7z-compressed', + + 'HTML': 'text/html', + 'Text': 'text/plain', + 'PDF': 'application/pdf', + + 'CSV': 'text/csv', + 'XML': 'application/xml', + 'JSON': 'application/json', + + #'application/msword (.doc)', # .doc + #'application/vnd.openxmlformats-officedocument.wordprocessingml.document', # .docx + #'application/vnd.ms-excel', # .xls + #'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', # .xlsx + + 'MP3': 'audio/mpeg', # .mp3 + + 'MP4': 'video/mp4', # .mp4 + 'MPEG': 'video/mpeg', # .mpeg + + 'JPEG': 'image/jpeg', + 'GIF': 'image/gif', + 'PNG': 'image/png', + 'TIFF': 'image/tiff', + + 'Unknown': None, + } + + def __init__(self): self.platform_name = 'archiveorg' + self.session = internetarchive.get_session() + + @staticmethod + def want_item_file(f: dict, item_name: str) -> bool: + """ + Filters IA API files + """ + if f.source != 'original': + return False + for suffix in [ + '_meta.sqlite', + '_archive.torrent', + '_itemimage.jpg', + '_meta.xml', + '_thumb.png', + '_files.xml', + ]: + if f.name == item_name + suffix or f.name == item_name.lower() + suffix: + return False + if f.name.startswith('_'): + return False + if item_name.startswith('academictorrents_'): + for suffix in ['_academictorrents.torrent', '_academictorrents_torrent.txt', '.bib']: + if f.name == item_name + suffix: + return False + return True + + def parse_item_file(self, f: dict) -> FilesetManifestFile: + """ + Takes an IA API file and turns it in to a fatcat fileset manifest file + """ + assert f.name and f.sha1 and f.md5 + assert f.name is not None + mf = { + 'path': f.name, + 'size': int(f.size), + 'sha1': f.sha1, + 'md5': f.md5, + } + # TODO: will disable this hard check eventually and replace with: + #mimetype = FORMAT_TO_MIMETYPE.get(f.format) + mimetype = self.FORMAT_TO_MIMETYPE[f.format] + if mimetype: + mf['extra'] = dict(mimetype=mimetype) + return mf + + + def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool: + patterns = [ + '://archive.org/details/', + '://archive.org/download/', + ] + for p in patterns: + if p in request['base_url']: + return True + return False + + def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem: + """ + Fetch platform-specific metadata for this request (eg, via API calls) + + XXX: add platform_url (for direct download) + """ + + base_url_split = request['base_url'].split('/') + #print(base_url_split, file=sys.stderr) + assert len(base_url_split) == 5 + assert base_url_split[0] in ['http:', 'https:'] + assert base_url_split[2] == 'archive.org' + assert base_url_split[3] in ['details', 'download'] + item_name = base_url_split[4] + + print(f" archiveorg processing item={item_name}", file=sys.stderr) + item = self.session.get_item(item_name) + item_name = item.identifier + item_collection = item.metadata['collection'] + if type(item_collection) == list: + item_collection = item_collection[0] + assert item.metadata['mediatype'] not in ['collection', 'web'] + item_files = item.get_files(on_the_fly=False) + manifest = [self.parse_item_file(f) for f in item_files if self.want_item_file(f, item_name)] + + return DatasetPlatformItem( + platform_name=self.platform_name, + platform_status='success', + manifest=manifest, + platform_domain='archive.org', + platform_id=item_name, + archiveorg_item_name=item_name, + archiveorg_collection=item_collection, + ) + + def chose_strategy(self, item: DatasetPlatformItem) -> IngestStrategy: + if len(item.manifest) == 1: + # NOTE: code flow does not support ArchiveorgFilesetBundle for the + # case of, eg, a single zipfile in an archive.org item + return IngestStrategy.ArchiveorgFile + elif len(item.manifest) >= 1: + return IngestStrategy.ArchiveorgFileset + else: + raise NotImplementedError() + + +DATASET_PLATFORM_HELPER_TABLE = { + 'dataverse': DataverseHelper(), + 'archiveorg': ArchiveOrgHelper(), +} diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py index 592b475..26bc5ad 100644 --- a/python/sandcrawler/fileset_strategies.py +++ b/python/sandcrawler/fileset_strategies.py @@ -6,17 +6,62 @@ import time from collections import namedtuple from typing import Optional, Tuple, Any, Dict, List +import internetarchive + from sandcrawler.html_metadata import BiblioMetadata from sandcrawler.ia import ResourceResult -from sandcrawler.fileset_types import IngestStrategy, FilesetManifestFile, DatasetPlatformItem +from sandcrawler.fileset_types import IngestStrategy, FilesetManifestFile, DatasetPlatformItem, ArchiveStrategyResult + + +class FilesetIngestStrategy(): + + def __init__(self): + #self.ingest_strategy = 'unknown' + pass + + def check_existing(self, item: DatasetPlatformItem) -> Optional[ArchiveStrategyResult]: + raise NotImplementedError() + + def process(self, item: DatasetPlatformItem) -> ArchiveStrategyResult: + raise NotImplementedError() + + +class ArchiveorgFilesetStrategy(FilesetIngestStrategy): + def __init__(self): + self.ingest_strategy = IngestStrategy.ArchiveorgFileset + self.session = internetarchive.get_session() -class FilesetIngestStrategy(class): + def check_existing(self, item: DatasetPlatformItem) -> Optional[ArchiveStrategyResult]: + """ + use API to check for item with all the files in the manifest + TODO: this naive comparison is quadratic in number of files, aka O(N^2) - def __init__(): - self.ingest_strategy = 'unknown' + XXX: should this verify sha256 and/or mimetype? + """ + ia_item = self.session.get_item(item.archiveorg_item_name) + item_files = ia_item.get_files(on_the_fly=False) + for wanted in item.manifest: + found = False + for existing in item_files: + if existing.sha1 == wanted.sha1 and existing.name == wanted.path and existing.size == wanted.size: + found = True + break + if not found: + print(f" didn't find at least one file: {wanted}", file=sys.stderr) + return None + return ArchiveStrategyResult( + ingest_strategy=self.ingest_strategy, + status='success-existing', + manifest=item.manifest, + ) - def check_existing(): # XXX: -> Any: + def process(self, item: DatasetPlatformItem) -> ArchiveStrategyResult: + existing = self.check_existing(item) + if existing: + return existing raise NotImplementedError() - def process(item: DatasetPlatformItem): +FILESET_STRATEGY_HELPER_TABLE = { + IngestStrategy.ArchiveorgFileset: ArchiveorgFilesetStrategy(), +} diff --git a/python/sandcrawler/fileset_types.py b/python/sandcrawler/fileset_types.py index f0f03db..75f4992 100644 --- a/python/sandcrawler/fileset_types.py +++ b/python/sandcrawler/fileset_types.py @@ -1,5 +1,6 @@ from enum import Enum +from typing import Optional, Tuple, Any, Dict, List from pydantic import BaseModel diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py index 9ffaa47..50e3c05 100644 --- a/python/sandcrawler/ingest_fileset.py +++ b/python/sandcrawler/ingest_fileset.py @@ -12,13 +12,16 @@ from selectolax.parser import HTMLParser from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult, fix_transfer_encoding, NoCaptureError from sandcrawler.misc import gen_file_metadata, clean_url, parse_cdx_datetime from sandcrawler.html import extract_fulltext_url -from sandcrawler.html_ingest import fetch_html_resources, \ +from sandcrawler.ingest_html import fetch_html_resources, \ quick_fetch_html_resources, html_guess_scope, html_extract_body_teixml, \ WebResource, html_guess_platform from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, html_extract_biblio, load_adblock_rules from sandcrawler.workers import SandcrawlerWorker from sandcrawler.db import SandcrawlerPostgrestClient +from sandcrawler.ingest_file import IngestFileWorker +from sandcrawler.fileset_platforms import DatasetPlatformHelper, DATASET_PLATFORM_HELPER_TABLE +from sandcrawler.fileset_strategies import FilesetIngestStrategy, FILESET_STRATEGY_HELPER_TABLE MAX_BODY_SIZE_BYTES = 128*1024*1024 @@ -41,6 +44,8 @@ class IngestFilesetWorker(IngestFileWorker): super().__init__(sink=None, **kwargs) self.sink = sink + self.dataset_platform_helpers = DATASET_PLATFORM_HELPER_TABLE + self.dataset_strategy_archivers = FILESET_STRATEGY_HELPER_TABLE def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: @@ -84,20 +89,16 @@ class IngestFilesetWorker(IngestFileWorker): force_recrawl = bool(request.get('force_recrawl', False)) - for block in self.base_url_blocklist: - if block in base_url: - print("[SKIP {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) - return dict(request=request, hit=False, status="skip-url-blocklist") - print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) - # TODO + # TODO: "existing" check for new fileset ingest result table #existing = self.check_existing_ingest(ingest_type, base_url) #if existing: # return self.process_existing(request, existing) result: Dict[str, Any] = dict(request=request, hit=False) - hops = [base_url] + result['hops'] = [base_url] + next_url = base_url # 1. Determine `platform`, which may involve resolving redirects and crawling a landing page. @@ -105,12 +106,15 @@ class IngestFilesetWorker(IngestFileWorker): # check against blocklist for block in self.base_url_blocklist: + # XXX: hack to not skip archive.org content + if 'archive.org' in block: + continue if block in next_url: result['status'] = "skip-url-blocklist" return result try: - resource = self.find_resource(next_url, best_mimetype, force_recrawl=force_recrawl) + resource = self.find_resource(next_url, force_recrawl=force_recrawl) except SavePageNowError as e: result['status'] = 'spn2-error' result['error_message'] = str(e)[:1600] @@ -134,109 +138,111 @@ class IngestFilesetWorker(IngestFileWorker): result['error_message'] = str(e)[:1600] return result except NotImplementedError as e: - result['status'] = 'not-implemented' - result['error_message'] = str(e)[:1600] - return result - - assert resource - - if resource.terminal_url: - result['terminal'] = { - "terminal_url": resource.terminal_url, - "terminal_dt": resource.terminal_dt, - "terminal_status_code": resource.terminal_status_code, - } - if resource.terminal_url not in result['hops']: - result['hops'].append(resource.terminal_url) - - if not resource.hit: - result['status'] = resource.status - return result + #result['status'] = 'not-implemented' + #result['error_message'] = str(e)[:1600] + #return result + resource = None - if resource.terminal_url: - for pattern in self.base_url_blocklist: - if pattern in resource.terminal_url: - result['status'] = 'skip-url-blocklist' - return result - - if resource.terminal_url: - for pattern in self.cookie_blocklist: - if pattern in resource.terminal_url: - result['status'] = 'blocked-cookie' - return result + html_biblio = None + if resource: + if resource.terminal_url: + result['terminal'] = { + "terminal_url": resource.terminal_url, + "terminal_dt": resource.terminal_dt, + "terminal_status_code": resource.terminal_status_code, + } + if resource.terminal_url not in result['hops']: + result['hops'].append(resource.terminal_url) + + if not resource.hit: + result['status'] = resource.status + return result - if not resource.body: - result['status'] = 'null-body' - return result + if resource.terminal_url: + for pattern in self.base_url_blocklist: + if pattern in resource.terminal_url: + result['status'] = 'skip-url-blocklist' + return result - if len(resource.body) > MAX_BODY_SIZE_BYTES: - result['status'] = 'body-too-large' - return result + if resource.terminal_url: + for pattern in self.cookie_blocklist: + if pattern in resource.terminal_url: + result['status'] = 'blocked-cookie' + return result - file_meta = gen_file_metadata(resource.body) - try: - file_meta, resource = fix_transfer_encoding(file_meta, resource) - except Exception as e: - result['status'] = 'bad-gzip-encoding' - result['error_message'] = str(e) - return result + if not resource.body: + result['status'] = 'null-body' + return result - if not resource.body or file_meta['size_bytes'] == 0: - result['status'] = 'null-body' - return result + if len(resource.body) > MAX_BODY_SIZE_BYTES: + result['status'] = 'body-too-large' + return result - # here we split based on ingest type to try and extract a next hop - html_ish_resource = bool( - "html" in file_meta['mimetype'] - or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml" - or "application/xml" in file_meta['mimetype'] - or "text/xml" in file_meta['mimetype'] - ) - html_biblio = None - html_doc = None - if html_ish_resource and resource.body: + file_meta = gen_file_metadata(resource.body) try: - html_doc = HTMLParser(resource.body) - html_biblio = html_extract_biblio(resource.terminal_url, html_doc) - if html_biblio: - if not 'html_biblio' in result or html_biblio.title: - result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True)) - #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr) - except ValueError: - pass - - # fetch must be a hit if we got this far (though not necessarily an ingest hit!) - assert resource - assert resource.hit == True - assert resource.terminal_status_code in (200, 226) - - if resource.terminal_url: - result['terminal'] = { - "terminal_url": resource.terminal_url, - "terminal_dt": resource.terminal_dt, - "terminal_status_code": resource.terminal_status_code, - "terminal_sha1hex": file_meta['sha1hex'], - } - - result['file_meta'] = file_meta - result['cdx'] = cdx_to_dict(resource.cdx) - if resource.revisit_cdx: - result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx) - - if ingest_type == "pdf": - if file_meta['mimetype'] != "application/pdf": - result['status'] = "wrong-mimetype" # formerly: "other-mimetype" - return result - elif ingest_type == "xml": - if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"): - result['status'] = "wrong-mimetype" + file_meta, resource = fix_transfer_encoding(file_meta, resource) + except Exception as e: + result['status'] = 'bad-gzip-encoding' + result['error_message'] = str(e) return result - elif ingest_type == "html": - if file_meta['mimetype'] not in ("text/html", "application/xhtml+xml"): - result['status'] = "wrong-mimetype" + + if not resource.body or file_meta['size_bytes'] == 0: + result['status'] = 'null-body' return result - else: - raise NotImplementedError() + + # here we split based on ingest type to try and extract a next hop + html_ish_resource = bool( + "html" in file_meta['mimetype'] + or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml" + or "application/xml" in file_meta['mimetype'] + or "text/xml" in file_meta['mimetype'] + ) + html_biblio = None + html_doc = None + if html_ish_resource and resource.body: + try: + html_doc = HTMLParser(resource.body) + html_biblio = html_extract_biblio(resource.terminal_url, html_doc) + if html_biblio: + if not 'html_biblio' in result or html_biblio.title: + result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True)) + #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr) + except ValueError: + pass + + # fetch must be a hit if we got this far (though not necessarily an ingest hit!) + assert resource + assert resource.hit == True + assert resource.terminal_status_code in (200, 226) + + if resource.terminal_url: + result['terminal'] = { + "terminal_url": resource.terminal_url, + "terminal_dt": resource.terminal_dt, + "terminal_status_code": resource.terminal_status_code, + "terminal_sha1hex": file_meta['sha1hex'], + } + + result['file_meta'] = file_meta + result['cdx'] = cdx_to_dict(resource.cdx) + if resource.revisit_cdx: + result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx) + + if ingest_type == "pdf": + if file_meta['mimetype'] != "application/pdf": + result['status'] = "wrong-mimetype" # formerly: "other-mimetype" + return result + elif ingest_type == "xml": + if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"): + result['status'] = "wrong-mimetype" + return result + elif ingest_type == "html": + if file_meta['mimetype'] not in ("text/html", "application/xhtml+xml"): + result['status'] = "wrong-mimetype" + return result + else: + #raise NotImplementedError() + pass ### END COPYPASTA ### @@ -252,11 +258,15 @@ class IngestFilesetWorker(IngestFileWorker): return result # 2. Use platform-specific methods to fetch manifest metadata and decide on an `ingest_strategy`. - dataset_meta = platform_helper.process_request(request, resource.terminal_url, html_biblio) + terminal_url = base_url + if resource: + terminal_url = resource.terminal_url + dataset_meta = platform_helper.process_request(request, terminal_url, html_biblio) + print(dataset_meta, file=sys.stderr) platform = dataset_meta.platform_name - result['platform'] = dataset_meta.platform + result['platform'] = dataset_meta.platform_name result['platform_id'] = dataset_meta.platform_id - result['item_name'] = dataset_meta.item_name + result['item_name'] = dataset_meta.archiveorg_item_name if not dataset_meta.manifest: result['status'] = 'no-manifest' return result @@ -278,11 +288,11 @@ class IngestFilesetWorker(IngestFileWorker): # 4. Summarize status and return structured result metadata. result['status'] = archive_result.status - result['manifest'] = archive_result.manifest + result['manifest'] = [m.dict() for m in archive_result.manifest] result['file_count'] = len(archive_result.manifest) or None result['total_size'] = sum([m.size for m in archive_result.manifest if m.size]) or None - if result['status'] == 'success': + if result['status'].startswith('success'): result['hit'] = True print("[SUCCESS {:>5}] file_count={} total_size={}".format( ingest_type, -- cgit v1.2.3