aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2021-10-04 16:13:55 -0700
committerBryan Newbold <bnewbold@archive.org>2021-10-15 18:15:25 -0700
commitb3447503c0aa2e326ce1e46c993be28f907ec23b (patch)
tree4bd3de9016ecc95e38de5c75e6fd69b5ce26f74c
parent147319ae00a6b788104209083f65cbaa4329c862 (diff)
downloadsandcrawler-b3447503c0aa2e326ce1e46c993be28f907ec23b.tar.gz
sandcrawler-b3447503c0aa2e326ce1e46c993be28f907ec23b.zip
progress on dataset ingest
-rw-r--r--python/sandcrawler/fileset_platforms.py171
-rw-r--r--python/sandcrawler/fileset_strategies.py57
-rw-r--r--python/sandcrawler/fileset_types.py1
-rw-r--r--python/sandcrawler/ingest_fileset.py226
4 files changed, 333 insertions, 122 deletions
diff --git a/python/sandcrawler/fileset_platforms.py b/python/sandcrawler/fileset_platforms.py
index 7aeacf2..5342a4e 100644
--- a/python/sandcrawler/fileset_platforms.py
+++ b/python/sandcrawler/fileset_platforms.py
@@ -6,34 +6,189 @@ import time
from collections import namedtuple
from typing import Optional, Tuple, Any, Dict, List
+import internetarchive
+
from sandcrawler.html_metadata import BiblioMetadata
from sandcrawler.ia import ResourceResult
+from sandcrawler.fileset_types import *
-class DatasetPlatformHelper(class):
+class DatasetPlatformHelper():
- def __init__():
+ def __init__(self):
self.platform_name = 'unknown'
- def match_request(request: dict , resource: ResourceResult, html_biblio: Optional[BiblioMetadata]) -> bool:
+ def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool:
"""
Does this request look like it matches this platform?
"""
- raise NotImplemented
+ raise NotImplementedError()
- def get_item(request: dict, resource: ResourceResult, html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem:
+ def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
"""
- raise NotImplemented
+ raise NotImplementedError()
+
+ def chose_strategy(self, DatasetPlatformItem) -> IngestStrategy:
+ raise NotImplementedError()
class DataverseHelper(DatasetPlatformHelper):
- def __init__():
+ def __init__(self):
self.platform_name = 'dataverse'
+ def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool:
+ return False
+
+ def chose_strategy(self, DatasetPlatformItem) -> IngestStrategy:
+ raise NotImplementedError()
+
+
class ArchiveOrgHelper(DatasetPlatformHelper):
- def __init__():
+ FORMAT_TO_MIMETYPE = {
+ 'BZIP': 'application/x-bzip',
+ 'BZIP2': 'application/x-bzip2',
+ 'ZIP': 'application/zip',
+ 'GZIP': 'application/gzip',
+ 'RAR': 'application/vnd.rar',
+ 'TAR': 'application/x-tar',
+ '7z': 'application/x-7z-compressed',
+
+ 'HTML': 'text/html',
+ 'Text': 'text/plain',
+ 'PDF': 'application/pdf',
+
+ 'CSV': 'text/csv',
+ 'XML': 'application/xml',
+ 'JSON': 'application/json',
+
+ #'application/msword (.doc)', # .doc
+ #'application/vnd.openxmlformats-officedocument.wordprocessingml.document', # .docx
+ #'application/vnd.ms-excel', # .xls
+ #'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', # .xlsx
+
+ 'MP3': 'audio/mpeg', # .mp3
+
+ 'MP4': 'video/mp4', # .mp4
+ 'MPEG': 'video/mpeg', # .mpeg
+
+ 'JPEG': 'image/jpeg',
+ 'GIF': 'image/gif',
+ 'PNG': 'image/png',
+ 'TIFF': 'image/tiff',
+
+ 'Unknown': None,
+ }
+
+ def __init__(self):
self.platform_name = 'archiveorg'
+ self.session = internetarchive.get_session()
+
+ @staticmethod
+ def want_item_file(f: dict, item_name: str) -> bool:
+ """
+ Filters IA API files
+ """
+ if f.source != 'original':
+ return False
+ for suffix in [
+ '_meta.sqlite',
+ '_archive.torrent',
+ '_itemimage.jpg',
+ '_meta.xml',
+ '_thumb.png',
+ '_files.xml',
+ ]:
+ if f.name == item_name + suffix or f.name == item_name.lower() + suffix:
+ return False
+ if f.name.startswith('_'):
+ return False
+ if item_name.startswith('academictorrents_'):
+ for suffix in ['_academictorrents.torrent', '_academictorrents_torrent.txt', '.bib']:
+ if f.name == item_name + suffix:
+ return False
+ return True
+
+ def parse_item_file(self, f: dict) -> FilesetManifestFile:
+ """
+ Takes an IA API file and turns it in to a fatcat fileset manifest file
+ """
+ assert f.name and f.sha1 and f.md5
+ assert f.name is not None
+ mf = {
+ 'path': f.name,
+ 'size': int(f.size),
+ 'sha1': f.sha1,
+ 'md5': f.md5,
+ }
+ # TODO: will disable this hard check eventually and replace with:
+ #mimetype = FORMAT_TO_MIMETYPE.get(f.format)
+ mimetype = self.FORMAT_TO_MIMETYPE[f.format]
+ if mimetype:
+ mf['extra'] = dict(mimetype=mimetype)
+ return mf
+
+
+ def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool:
+ patterns = [
+ '://archive.org/details/',
+ '://archive.org/download/',
+ ]
+ for p in patterns:
+ if p in request['base_url']:
+ return True
+ return False
+
+ def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem:
+ """
+ Fetch platform-specific metadata for this request (eg, via API calls)
+
+ XXX: add platform_url (for direct download)
+ """
+
+ base_url_split = request['base_url'].split('/')
+ #print(base_url_split, file=sys.stderr)
+ assert len(base_url_split) == 5
+ assert base_url_split[0] in ['http:', 'https:']
+ assert base_url_split[2] == 'archive.org'
+ assert base_url_split[3] in ['details', 'download']
+ item_name = base_url_split[4]
+
+ print(f" archiveorg processing item={item_name}", file=sys.stderr)
+ item = self.session.get_item(item_name)
+ item_name = item.identifier
+ item_collection = item.metadata['collection']
+ if type(item_collection) == list:
+ item_collection = item_collection[0]
+ assert item.metadata['mediatype'] not in ['collection', 'web']
+ item_files = item.get_files(on_the_fly=False)
+ manifest = [self.parse_item_file(f) for f in item_files if self.want_item_file(f, item_name)]
+
+ return DatasetPlatformItem(
+ platform_name=self.platform_name,
+ platform_status='success',
+ manifest=manifest,
+ platform_domain='archive.org',
+ platform_id=item_name,
+ archiveorg_item_name=item_name,
+ archiveorg_collection=item_collection,
+ )
+
+ def chose_strategy(self, item: DatasetPlatformItem) -> IngestStrategy:
+ if len(item.manifest) == 1:
+ # NOTE: code flow does not support ArchiveorgFilesetBundle for the
+ # case of, eg, a single zipfile in an archive.org item
+ return IngestStrategy.ArchiveorgFile
+ elif len(item.manifest) >= 1:
+ return IngestStrategy.ArchiveorgFileset
+ else:
+ raise NotImplementedError()
+
+
+DATASET_PLATFORM_HELPER_TABLE = {
+ 'dataverse': DataverseHelper(),
+ 'archiveorg': ArchiveOrgHelper(),
+}
diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py
index 592b475..26bc5ad 100644
--- a/python/sandcrawler/fileset_strategies.py
+++ b/python/sandcrawler/fileset_strategies.py
@@ -6,17 +6,62 @@ import time
from collections import namedtuple
from typing import Optional, Tuple, Any, Dict, List
+import internetarchive
+
from sandcrawler.html_metadata import BiblioMetadata
from sandcrawler.ia import ResourceResult
-from sandcrawler.fileset_types import IngestStrategy, FilesetManifestFile, DatasetPlatformItem
+from sandcrawler.fileset_types import IngestStrategy, FilesetManifestFile, DatasetPlatformItem, ArchiveStrategyResult
+
+
+class FilesetIngestStrategy():
+
+ def __init__(self):
+ #self.ingest_strategy = 'unknown'
+ pass
+
+ def check_existing(self, item: DatasetPlatformItem) -> Optional[ArchiveStrategyResult]:
+ raise NotImplementedError()
+
+ def process(self, item: DatasetPlatformItem) -> ArchiveStrategyResult:
+ raise NotImplementedError()
+
+
+class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
+ def __init__(self):
+ self.ingest_strategy = IngestStrategy.ArchiveorgFileset
+ self.session = internetarchive.get_session()
-class FilesetIngestStrategy(class):
+ def check_existing(self, item: DatasetPlatformItem) -> Optional[ArchiveStrategyResult]:
+ """
+ use API to check for item with all the files in the manifest
+ TODO: this naive comparison is quadratic in number of files, aka O(N^2)
- def __init__():
- self.ingest_strategy = 'unknown'
+ XXX: should this verify sha256 and/or mimetype?
+ """
+ ia_item = self.session.get_item(item.archiveorg_item_name)
+ item_files = ia_item.get_files(on_the_fly=False)
+ for wanted in item.manifest:
+ found = False
+ for existing in item_files:
+ if existing.sha1 == wanted.sha1 and existing.name == wanted.path and existing.size == wanted.size:
+ found = True
+ break
+ if not found:
+ print(f" didn't find at least one file: {wanted}", file=sys.stderr)
+ return None
+ return ArchiveStrategyResult(
+ ingest_strategy=self.ingest_strategy,
+ status='success-existing',
+ manifest=item.manifest,
+ )
- def check_existing(): # XXX: -> Any:
+ def process(self, item: DatasetPlatformItem) -> ArchiveStrategyResult:
+ existing = self.check_existing(item)
+ if existing:
+ return existing
raise NotImplementedError()
- def process(item: DatasetPlatformItem):
+FILESET_STRATEGY_HELPER_TABLE = {
+ IngestStrategy.ArchiveorgFileset: ArchiveorgFilesetStrategy(),
+}
diff --git a/python/sandcrawler/fileset_types.py b/python/sandcrawler/fileset_types.py
index f0f03db..75f4992 100644
--- a/python/sandcrawler/fileset_types.py
+++ b/python/sandcrawler/fileset_types.py
@@ -1,5 +1,6 @@
from enum import Enum
+from typing import Optional, Tuple, Any, Dict, List
from pydantic import BaseModel
diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py
index 9ffaa47..50e3c05 100644
--- a/python/sandcrawler/ingest_fileset.py
+++ b/python/sandcrawler/ingest_fileset.py
@@ -12,13 +12,16 @@ from selectolax.parser import HTMLParser
from sandcrawler.ia import SavePageNowClient, CdxApiClient, WaybackClient, WaybackError, WaybackContentError, SavePageNowError, CdxApiError, PetaboxError, cdx_to_dict, ResourceResult, fix_transfer_encoding, NoCaptureError
from sandcrawler.misc import gen_file_metadata, clean_url, parse_cdx_datetime
from sandcrawler.html import extract_fulltext_url
-from sandcrawler.html_ingest import fetch_html_resources, \
+from sandcrawler.ingest_html import fetch_html_resources, \
quick_fetch_html_resources, html_guess_scope, html_extract_body_teixml, \
WebResource, html_guess_platform
from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, html_extract_biblio, load_adblock_rules
from sandcrawler.workers import SandcrawlerWorker
from sandcrawler.db import SandcrawlerPostgrestClient
+from sandcrawler.ingest_file import IngestFileWorker
+from sandcrawler.fileset_platforms import DatasetPlatformHelper, DATASET_PLATFORM_HELPER_TABLE
+from sandcrawler.fileset_strategies import FilesetIngestStrategy, FILESET_STRATEGY_HELPER_TABLE
MAX_BODY_SIZE_BYTES = 128*1024*1024
@@ -41,6 +44,8 @@ class IngestFilesetWorker(IngestFileWorker):
super().__init__(sink=None, **kwargs)
self.sink = sink
+ self.dataset_platform_helpers = DATASET_PLATFORM_HELPER_TABLE
+ self.dataset_strategy_archivers = FILESET_STRATEGY_HELPER_TABLE
def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]:
@@ -84,20 +89,16 @@ class IngestFilesetWorker(IngestFileWorker):
force_recrawl = bool(request.get('force_recrawl', False))
- for block in self.base_url_blocklist:
- if block in base_url:
- print("[SKIP {:>6}] {}".format(ingest_type, base_url), file=sys.stderr)
- return dict(request=request, hit=False, status="skip-url-blocklist")
-
print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr)
- # TODO
+ # TODO: "existing" check for new fileset ingest result table
#existing = self.check_existing_ingest(ingest_type, base_url)
#if existing:
# return self.process_existing(request, existing)
result: Dict[str, Any] = dict(request=request, hit=False)
- hops = [base_url]
+ result['hops'] = [base_url]
+ next_url = base_url
# 1. Determine `platform`, which may involve resolving redirects and crawling a landing page.
@@ -105,12 +106,15 @@ class IngestFilesetWorker(IngestFileWorker):
# check against blocklist
for block in self.base_url_blocklist:
+ # XXX: hack to not skip archive.org content
+ if 'archive.org' in block:
+ continue
if block in next_url:
result['status'] = "skip-url-blocklist"
return result
try:
- resource = self.find_resource(next_url, best_mimetype, force_recrawl=force_recrawl)
+ resource = self.find_resource(next_url, force_recrawl=force_recrawl)
except SavePageNowError as e:
result['status'] = 'spn2-error'
result['error_message'] = str(e)[:1600]
@@ -134,109 +138,111 @@ class IngestFilesetWorker(IngestFileWorker):
result['error_message'] = str(e)[:1600]
return result
except NotImplementedError as e:
- result['status'] = 'not-implemented'
- result['error_message'] = str(e)[:1600]
- return result
-
- assert resource
-
- if resource.terminal_url:
- result['terminal'] = {
- "terminal_url": resource.terminal_url,
- "terminal_dt": resource.terminal_dt,
- "terminal_status_code": resource.terminal_status_code,
- }
- if resource.terminal_url not in result['hops']:
- result['hops'].append(resource.terminal_url)
-
- if not resource.hit:
- result['status'] = resource.status
- return result
+ #result['status'] = 'not-implemented'
+ #result['error_message'] = str(e)[:1600]
+ #return result
+ resource = None
- if resource.terminal_url:
- for pattern in self.base_url_blocklist:
- if pattern in resource.terminal_url:
- result['status'] = 'skip-url-blocklist'
- return result
-
- if resource.terminal_url:
- for pattern in self.cookie_blocklist:
- if pattern in resource.terminal_url:
- result['status'] = 'blocked-cookie'
- return result
+ html_biblio = None
+ if resource:
+ if resource.terminal_url:
+ result['terminal'] = {
+ "terminal_url": resource.terminal_url,
+ "terminal_dt": resource.terminal_dt,
+ "terminal_status_code": resource.terminal_status_code,
+ }
+ if resource.terminal_url not in result['hops']:
+ result['hops'].append(resource.terminal_url)
+
+ if not resource.hit:
+ result['status'] = resource.status
+ return result
- if not resource.body:
- result['status'] = 'null-body'
- return result
+ if resource.terminal_url:
+ for pattern in self.base_url_blocklist:
+ if pattern in resource.terminal_url:
+ result['status'] = 'skip-url-blocklist'
+ return result
- if len(resource.body) > MAX_BODY_SIZE_BYTES:
- result['status'] = 'body-too-large'
- return result
+ if resource.terminal_url:
+ for pattern in self.cookie_blocklist:
+ if pattern in resource.terminal_url:
+ result['status'] = 'blocked-cookie'
+ return result
- file_meta = gen_file_metadata(resource.body)
- try:
- file_meta, resource = fix_transfer_encoding(file_meta, resource)
- except Exception as e:
- result['status'] = 'bad-gzip-encoding'
- result['error_message'] = str(e)
- return result
+ if not resource.body:
+ result['status'] = 'null-body'
+ return result
- if not resource.body or file_meta['size_bytes'] == 0:
- result['status'] = 'null-body'
- return result
+ if len(resource.body) > MAX_BODY_SIZE_BYTES:
+ result['status'] = 'body-too-large'
+ return result
- # here we split based on ingest type to try and extract a next hop
- html_ish_resource = bool(
- "html" in file_meta['mimetype']
- or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml"
- or "application/xml" in file_meta['mimetype']
- or "text/xml" in file_meta['mimetype']
- )
- html_biblio = None
- html_doc = None
- if html_ish_resource and resource.body:
+ file_meta = gen_file_metadata(resource.body)
try:
- html_doc = HTMLParser(resource.body)
- html_biblio = html_extract_biblio(resource.terminal_url, html_doc)
- if html_biblio:
- if not 'html_biblio' in result or html_biblio.title:
- result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True))
- #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr)
- except ValueError:
- pass
-
- # fetch must be a hit if we got this far (though not necessarily an ingest hit!)
- assert resource
- assert resource.hit == True
- assert resource.terminal_status_code in (200, 226)
-
- if resource.terminal_url:
- result['terminal'] = {
- "terminal_url": resource.terminal_url,
- "terminal_dt": resource.terminal_dt,
- "terminal_status_code": resource.terminal_status_code,
- "terminal_sha1hex": file_meta['sha1hex'],
- }
-
- result['file_meta'] = file_meta
- result['cdx'] = cdx_to_dict(resource.cdx)
- if resource.revisit_cdx:
- result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx)
-
- if ingest_type == "pdf":
- if file_meta['mimetype'] != "application/pdf":
- result['status'] = "wrong-mimetype" # formerly: "other-mimetype"
- return result
- elif ingest_type == "xml":
- if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"):
- result['status'] = "wrong-mimetype"
+ file_meta, resource = fix_transfer_encoding(file_meta, resource)
+ except Exception as e:
+ result['status'] = 'bad-gzip-encoding'
+ result['error_message'] = str(e)
return result
- elif ingest_type == "html":
- if file_meta['mimetype'] not in ("text/html", "application/xhtml+xml"):
- result['status'] = "wrong-mimetype"
+
+ if not resource.body or file_meta['size_bytes'] == 0:
+ result['status'] = 'null-body'
return result
- else:
- raise NotImplementedError()
+
+ # here we split based on ingest type to try and extract a next hop
+ html_ish_resource = bool(
+ "html" in file_meta['mimetype']
+ or "xhtml" in file_meta['mimetype'] # matches "application/xhtml+xml"
+ or "application/xml" in file_meta['mimetype']
+ or "text/xml" in file_meta['mimetype']
+ )
+ html_biblio = None
+ html_doc = None
+ if html_ish_resource and resource.body:
+ try:
+ html_doc = HTMLParser(resource.body)
+ html_biblio = html_extract_biblio(resource.terminal_url, html_doc)
+ if html_biblio:
+ if not 'html_biblio' in result or html_biblio.title:
+ result['html_biblio'] = json.loads(html_biblio.json(exclude_none=True))
+ #print(f" setting html_biblio: {result['html_biblio']}", file=sys.stderr)
+ except ValueError:
+ pass
+
+ # fetch must be a hit if we got this far (though not necessarily an ingest hit!)
+ assert resource
+ assert resource.hit == True
+ assert resource.terminal_status_code in (200, 226)
+
+ if resource.terminal_url:
+ result['terminal'] = {
+ "terminal_url": resource.terminal_url,
+ "terminal_dt": resource.terminal_dt,
+ "terminal_status_code": resource.terminal_status_code,
+ "terminal_sha1hex": file_meta['sha1hex'],
+ }
+
+ result['file_meta'] = file_meta
+ result['cdx'] = cdx_to_dict(resource.cdx)
+ if resource.revisit_cdx:
+ result['revisit_cdx'] = cdx_to_dict(resource.revisit_cdx)
+
+ if ingest_type == "pdf":
+ if file_meta['mimetype'] != "application/pdf":
+ result['status'] = "wrong-mimetype" # formerly: "other-mimetype"
+ return result
+ elif ingest_type == "xml":
+ if file_meta['mimetype'] not in ("application/xml", "text/xml", "application/jats+xml"):
+ result['status'] = "wrong-mimetype"
+ return result
+ elif ingest_type == "html":
+ if file_meta['mimetype'] not in ("text/html", "application/xhtml+xml"):
+ result['status'] = "wrong-mimetype"
+ return result
+ else:
+ #raise NotImplementedError()
+ pass
### END COPYPASTA ###
@@ -252,11 +258,15 @@ class IngestFilesetWorker(IngestFileWorker):
return result
# 2. Use platform-specific methods to fetch manifest metadata and decide on an `ingest_strategy`.
- dataset_meta = platform_helper.process_request(request, resource.terminal_url, html_biblio)
+ terminal_url = base_url
+ if resource:
+ terminal_url = resource.terminal_url
+ dataset_meta = platform_helper.process_request(request, terminal_url, html_biblio)
+ print(dataset_meta, file=sys.stderr)
platform = dataset_meta.platform_name
- result['platform'] = dataset_meta.platform
+ result['platform'] = dataset_meta.platform_name
result['platform_id'] = dataset_meta.platform_id
- result['item_name'] = dataset_meta.item_name
+ result['item_name'] = dataset_meta.archiveorg_item_name
if not dataset_meta.manifest:
result['status'] = 'no-manifest'
return result
@@ -278,11 +288,11 @@ class IngestFilesetWorker(IngestFileWorker):
# 4. Summarize status and return structured result metadata.
result['status'] = archive_result.status
- result['manifest'] = archive_result.manifest
+ result['manifest'] = [m.dict() for m in archive_result.manifest]
result['file_count'] = len(archive_result.manifest) or None
result['total_size'] = sum([m.size for m in archive_result.manifest if m.size]) or None
- if result['status'] == 'success':
+ if result['status'].startswith('success'):
result['hit'] = True
print("[SUCCESS {:>5}] file_count={} total_size={}".format(
ingest_type,