aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/sandcrawler/fileset_platforms.py69
-rw-r--r--python/sandcrawler/fileset_strategies.py23
-rw-r--r--python/sandcrawler/fileset_types.py15
-rw-r--r--python/sandcrawler/ingest_fileset.py123
4 files changed, 148 insertions, 82 deletions
diff --git a/python/sandcrawler/fileset_platforms.py b/python/sandcrawler/fileset_platforms.py
index 5f2f743..bcf2144 100644
--- a/python/sandcrawler/fileset_platforms.py
+++ b/python/sandcrawler/fileset_platforms.py
@@ -15,7 +15,7 @@ from sandcrawler.ia import ResourceResult
from sandcrawler.fileset_types import *
-class DatasetPlatformHelper():
+class FilesetPlatformHelper():
def __init__(self):
self.platform_name = 'unknown'
@@ -26,16 +26,16 @@ class DatasetPlatformHelper():
"""
raise NotImplementedError()
- def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem:
+ def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
"""
raise NotImplementedError()
- def chose_strategy(self, item: DatasetPlatformItem) -> IngestStrategy:
+ def chose_strategy(self, item: FilesetPlatformItem) -> IngestStrategy:
assert item.manifest
- total_size = sum([m.size for m in item.manifest])
- largest_size = max([m.size for m in item.manifest])
+ total_size = sum([m.size for m in item.manifest]) or 0
+ largest_size = max([m.size or 0 for m in item.manifest]) or 0
#print(f" total_size={total_size} largest_size={largest_size}", file=sys.stderr)
# XXX: while developing ArchiveorgFileset path
#return IngestStrategy.ArchiveorgFileset
@@ -51,7 +51,7 @@ class DatasetPlatformHelper():
return IngestStrategy.ArchiveorgFileset
-class DataverseHelper(DatasetPlatformHelper):
+class DataverseHelper(FilesetPlatformHelper):
def __init__(self):
self.platform_name = 'dataverse'
@@ -133,10 +133,10 @@ class DataverseHelper(DatasetPlatformHelper):
components = urllib.parse.urlparse(url)
platform_domain = components.netloc.split(':')[0].lower()
params = urllib.parse.parse_qs(components.query)
- platform_id = params.get('persistentId')
- if not platform_id:
+ id_param = params.get('persistentId')
+ if not id_param:
return False
- platform_id = platform_id[0]
+ platform_id = id_param[0]
try:
parsed = self.parse_dataverse_persistentid(platform_id)
@@ -145,7 +145,7 @@ class DataverseHelper(DatasetPlatformHelper):
return True
- def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem:
+ def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
@@ -162,14 +162,14 @@ class DataverseHelper(DatasetPlatformHelper):
components = urllib.parse.urlparse(url)
platform_domain = components.netloc.split(':')[0].lower()
params = urllib.parse.parse_qs(components.query)
- dataset_version = params.get('version')
- platform_id = params.get('persistentId')
- if not (platform_id and platform_id[0]):
+ id_param = params.get('persistentId')
+ if not (id_param and id_param[0]):
raise PlatformScopeError("Expected a Dataverse persistentId in URL")
- else:
- platform_id = platform_id[0]
- if type(dataset_version) == list:
- dataset_version = dataset_version[0]
+ platform_id = id_param[0]
+ version_param = params.get('version')
+ dataset_version = None
+ if version_param:
+ dataset_version = version_param[0]
try:
parsed_id = self.parse_dataverse_persistentid(platform_id)
@@ -243,7 +243,7 @@ class DataverseHelper(DatasetPlatformHelper):
if obj_latest.get('termsOfUse'):
archiveorg_item_meta['description'] += '\n<br>\n' + obj_latest['termsOfUse']
- return DatasetPlatformItem(
+ return FilesetPlatformItem(
platform_name=self.platform_name,
platform_status='success',
manifest=manifest,
@@ -321,18 +321,18 @@ def test_parse_dataverse_persistentid():
except ValueError:
pass
-class FigshareHelper(DatasetPlatformHelper):
+class FigshareHelper(FilesetPlatformHelper):
def __init__(self):
self.platform_name = 'figshare'
self.session = requests.Session()
@staticmethod
- def parse_figshare_url_path(path: str) -> List[str]:
+ def parse_figshare_url_path(path: str) -> Tuple[str, Optional[str]]:
"""
Tries to parse a figshare URL into ID number and (optional) version number.
- Returns a two-element list; version number will be None if not found
+ Returns a two-element tuple; version number will be None if not found
Raises a ValueError if not a figshare URL
"""
@@ -340,14 +340,14 @@ class FigshareHelper(DatasetPlatformHelper):
comp = path.split('/')
if len(comp) < 4 or comp[1] != 'articles':
- raise ValueError
+ raise ValueError(f"not a figshare URL: {path}")
if len(comp) == 5 and comp[3].isdigit() and comp[4].isdigit():
return (comp[3], comp[4])
elif len(comp) == 4 and comp[3].isdigit():
return (comp[3], None)
else:
- raise ValueError
+ raise ValueError(f"couldn't find figshare identiier: {path}")
def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool:
@@ -374,7 +374,7 @@ class FigshareHelper(DatasetPlatformHelper):
return False
- def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem:
+ def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
"""
@@ -390,7 +390,7 @@ class FigshareHelper(DatasetPlatformHelper):
(platform_id, dataset_version) = self.parse_figshare_url_path(components.path)
assert platform_id.isdigit(), f"expected numeric: {platform_id}"
- assert dataset_version.isdigit(), f"expected numeric: {dataset_version}"
+ assert dataset_version and dataset_version.isdigit(), f"expected numeric: {dataset_version}"
# 1b. if we didn't get a version number from URL, fetch it from API
# TODO: implement this code path
@@ -436,7 +436,7 @@ class FigshareHelper(DatasetPlatformHelper):
version=obj['version'],
)
- return DatasetPlatformItem(
+ return FilesetPlatformItem(
platform_name=self.platform_name,
platform_status='success',
manifest=manifest,
@@ -471,7 +471,7 @@ def test_parse_figshare_url_path():
except ValueError:
pass
-class ZenodoHelper(DatasetPlatformHelper):
+class ZenodoHelper(FilesetPlatformHelper):
def __init__(self):
self.platform_name = 'zenodo'
@@ -490,7 +490,7 @@ class ZenodoHelper(DatasetPlatformHelper):
return True
return False
- def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem:
+ def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
"""
@@ -567,7 +567,7 @@ class ZenodoHelper(DatasetPlatformHelper):
# obj['metadata']['version'] is, eg, git version tag
)
- return DatasetPlatformItem(
+ return FilesetPlatformItem(
platform_name=self.platform_name,
platform_status='success',
manifest=manifest,
@@ -581,7 +581,7 @@ class ZenodoHelper(DatasetPlatformHelper):
)
-class ArchiveOrgHelper(DatasetPlatformHelper):
+class ArchiveOrgHelper(FilesetPlatformHelper):
FORMAT_TO_MIMETYPE = {
'BZIP': 'application/x-bzip',
@@ -623,7 +623,7 @@ class ArchiveOrgHelper(DatasetPlatformHelper):
self.session = internetarchive.get_session()
@staticmethod
- def want_item_file(f: dict, item_name: str) -> bool:
+ def want_item_file(f: internetarchive.File, item_name: str) -> bool:
"""
Filters IA API files
"""
@@ -662,7 +662,7 @@ class ArchiveOrgHelper(DatasetPlatformHelper):
return True
return False
- def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem:
+ def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:
"""
Fetch platform-specific metadata for this request (eg, via API calls)
"""
@@ -700,7 +700,7 @@ class ArchiveOrgHelper(DatasetPlatformHelper):
)
manifest.append(mf)
- return DatasetPlatformItem(
+ return FilesetPlatformItem(
platform_name=self.platform_name,
platform_status='success',
manifest=manifest,
@@ -710,10 +710,11 @@ class ArchiveOrgHelper(DatasetPlatformHelper):
archiveorg_meta=dict(collection=item_collection),
)
- def chose_strategy(self, item: DatasetPlatformItem) -> IngestStrategy:
+ def chose_strategy(self, item: FilesetPlatformItem) -> IngestStrategy:
"""
Don't use default strategy picker; we are always doing an 'existing' in this case.
"""
+ assert item.manifest is not None
if len(item.manifest) == 1:
# NOTE: code flow does not support ArchiveorgFilesetBundle for the
# case of, eg, a single zipfile in an archive.org item
diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py
index 43f1a53..2577d2b 100644
--- a/python/sandcrawler/fileset_strategies.py
+++ b/python/sandcrawler/fileset_strategies.py
@@ -12,7 +12,7 @@ import internetarchive
from sandcrawler.html_metadata import BiblioMetadata
from sandcrawler.ia import ResourceResult, WaybackClient, SavePageNowClient, fix_transfer_encoding
-from sandcrawler.fileset_types import IngestStrategy, FilesetManifestFile, DatasetPlatformItem, ArchiveStrategyResult, PlatformScopeError
+from sandcrawler.fileset_types import IngestStrategy, FilesetManifestFile, FilesetPlatformItem, ArchiveStrategyResult, PlatformScopeError
from sandcrawler.misc import gen_file_metadata, gen_file_metadata_path
@@ -22,10 +22,10 @@ class FilesetIngestStrategy():
#self.ingest_strategy = 'unknown'
pass
- def check_existing(self, item: DatasetPlatformItem) -> Optional[ArchiveStrategyResult]:
+ def check_existing(self, item: FilesetPlatformItem) -> Optional[ArchiveStrategyResult]:
raise NotImplementedError()
- def process(self, item: DatasetPlatformItem) -> ArchiveStrategyResult:
+ def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult:
raise NotImplementedError()
@@ -44,7 +44,7 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
self.ia_session = internetarchive.get_session()
- def check_existing(self, item: DatasetPlatformItem) -> Optional[ArchiveStrategyResult]:
+ def check_existing(self, item: FilesetPlatformItem) -> Optional[ArchiveStrategyResult]:
"""
use API to check for item with all the files in the manifest
@@ -52,8 +52,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
"""
ia_item = self.ia_session.get_item(item.archiveorg_item_name)
if not ia_item.exists:
- return False
+ return None
item_files = ia_item.get_files(on_the_fly=False)
+ assert item.manifest
for wanted in item.manifest:
found = False
for existing in item_files:
@@ -74,7 +75,7 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
manifest=item.manifest,
)
- def process(self, item: DatasetPlatformItem) -> ArchiveStrategyResult:
+ def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult:
"""
May require extra context to pass along to archive.org item creation.
"""
@@ -94,6 +95,7 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
pass
# 1. download all files locally
+ assert item.manifest
for m in item.manifest:
# XXX: enforce safe/sane filename
@@ -143,7 +145,7 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
m.status = 'verified-local'
# 2. upload all files, with metadata
- assert item.archiveorg_item_meta['collection']
+ assert item.archiveorg_item_meta and item.archiveorg_item_meta['collection']
item_files = []
for m in item.manifest:
local_path = local_dir + '/' + m.path
@@ -212,7 +214,7 @@ class WebFilesetStrategy(FilesetIngestStrategy):
"://s3-eu-west-1.amazonaws.com/",
]
- def process(self, item: DatasetPlatformItem) -> ArchiveStrategyResult:
+ def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult:
"""
For each manifest item individually, run 'fetch_resource' and record stats, terminal_url, terminal_dt
@@ -220,6 +222,7 @@ class WebFilesetStrategy(FilesetIngestStrategy):
- full fetch_resource() method which can do SPN requests
"""
+ assert item.manifest
for m in item.manifest:
fetch_url = m.platform_url
if not fetch_url:
@@ -241,7 +244,7 @@ class WebFilesetStrategy(FilesetIngestStrategy):
print("[FETCH {:>6}] {} {}".format(
via,
(resource and resource.status),
- (resource and resource.terminal_url) or url),
+ (resource and resource.terminal_url) or fetch_url),
file=sys.stderr)
m.terminal_url = resource.terminal_url
@@ -268,7 +271,7 @@ class WebFilesetStrategy(FilesetIngestStrategy):
overall_status = "success"
for m in item.manifest:
if m.status != 'success':
- overall_status = m.status
+ overall_status = m.status or 'not-processed'
break
if not item.manifest:
overall_status = 'empty-manifest'
diff --git a/python/sandcrawler/fileset_types.py b/python/sandcrawler/fileset_types.py
index 9fe8b0d..037843e 100644
--- a/python/sandcrawler/fileset_types.py
+++ b/python/sandcrawler/fileset_types.py
@@ -19,20 +19,20 @@ class FilesetManifestFile(BaseModel):
sha1: Optional[str]
sha256: Optional[str]
mimetype: Optional[str]
+ extra: Optional[Dict[str, Any]]
status: Optional[str]
platform_url: Optional[str]
terminal_url: Optional[str]
terminal_dt: Optional[str]
- extra: Optional[Dict[str, Any]]
-class DatasetPlatformItem(BaseModel):
+class FilesetPlatformItem(BaseModel):
platform_name: str
platform_status: str
- manifest: Optional[List[FilesetManifestFile]]
-
platform_domain: Optional[str]
platform_id: Optional[str]
+ manifest: Optional[List[FilesetManifestFile]]
+
archiveorg_item_name: Optional[str]
archiveorg_item_meta: Optional[dict]
web_base_url: Optional[str]
@@ -42,6 +42,13 @@ class ArchiveStrategyResult(BaseModel):
ingest_strategy: str
status: str
manifest: List[FilesetManifestFile]
+ file_file_meta: Optional[Dict[str, Any]]
+ file_terminal: Optional[Dict[str, Any]]
+ file_cdx: Optional[Dict[str, Any]]
+ bundle_file_meta: Optional[Dict[str, Any]]
+ bundle_terminal: Optional[Any]
+ bundle_cdx: Optional[Any]
+ bundle_archiveorg_path: Optional[str]
class PlatformScopeError(Exception):
"""
diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py
index ce6cdca..de392b2 100644
--- a/python/sandcrawler/ingest_fileset.py
+++ b/python/sandcrawler/ingest_fileset.py
@@ -20,7 +20,7 @@ from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, ht
from sandcrawler.workers import SandcrawlerWorker
from sandcrawler.db import SandcrawlerPostgrestClient
from sandcrawler.ingest_file import IngestFileWorker
-from sandcrawler.fileset_platforms import DatasetPlatformHelper, DATASET_PLATFORM_HELPER_TABLE
+from sandcrawler.fileset_platforms import FilesetPlatformHelper, DATASET_PLATFORM_HELPER_TABLE
from sandcrawler.fileset_strategies import FilesetIngestStrategy, FILESET_STRATEGY_HELPER_TABLE
from sandcrawler.fileset_types import PlatformScopeError, PlatformRestrictedError
@@ -47,8 +47,8 @@ class IngestFilesetWorker(IngestFileWorker):
self.sink = sink
self.dataset_platform_helpers = DATASET_PLATFORM_HELPER_TABLE
self.dataset_strategy_archivers = FILESET_STRATEGY_HELPER_TABLE
- self.max_total_size = 100*1024*1024*1024
- self.max_file_count = 500
+ self.max_total_size = kwargs.get('max_total_size', 64*1024*1024*1024)
+ self.max_file_count = kwargs.get('max_file_count', 200)
def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]:
@@ -79,34 +79,15 @@ class IngestFilesetWorker(IngestFileWorker):
return False
return True
- def process(self, request: dict, key: Any = None) -> dict:
-
- ingest_type = request.get('ingest_type')
- if ingest_type not in ("dataset",):
- raise NotImplementedError(f"can't handle ingest_type={ingest_type}")
-
- # parse/clean URL
- # note that we pass through the original/raw URL, and that is what gets
- # persisted in database table
- base_url = clean_url(request['base_url'])
-
- force_recrawl = bool(request.get('force_recrawl', False))
-
- print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr)
-
- # TODO: "existing" check for new fileset ingest result table
- #existing = self.check_existing_ingest(ingest_type, base_url)
- #if existing:
- # return self.process_existing(request, existing)
+ def fetch_resource_iteratively(self, ingest_type: str, base_url: str, force_recrawl: bool) -> dict:
+ """
+ This is copypasta from process_file(), should probably refactor.
+ """
- result: Dict[str, Any] = dict(request=request, hit=False)
+ result: Dict[str, Any] = dict(hit=False)
result['hops'] = [base_url]
next_url = base_url
- # 1. Determine `platform`, which may involve resolving redirects and crawling a landing page.
-
- ### START COPYPASTA from process_file(), should refactor ###
-
# check against blocklist
for block in self.base_url_blocklist:
# XXX: hack to not skip archive.org content
@@ -247,9 +228,42 @@ class IngestFilesetWorker(IngestFileWorker):
#raise NotImplementedError()
pass
- ### END COPYPASTA ###
+ result['_html_biblio'] = html_biblio
+ result['_resource'] = resource
+ return result
+
+
+ def process(self, request: dict, key: Any = None) -> dict:
+
+ ingest_type = request.get('ingest_type')
+ if ingest_type not in ("dataset",):
+ raise NotImplementedError(f"can't handle ingest_type={ingest_type}")
+
+ # parse/clean URL
+ # note that we pass through the original/raw URL, and that is what gets
+ # persisted in database table
+ base_url = clean_url(request['base_url'])
+
+ force_recrawl = bool(request.get('force_recrawl', False))
+
+ print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr)
+
+ # TODO: "existing" check against file and/or fileset ingest result table
+ #existing = self.check_existing_ingest(ingest_type, base_url)
+ #if existing:
+ # return self.process_existing(request, existing)
+
+ result = self.fetch_resource_iteratively(ingest_type, base_url, force_recrawl=force_recrawl)
+ if result.get('status') != None:
+ result['request'] = request
+ return result
+
+ html_biblio = result.pop('_html_biblio')
+ resource = result.pop('_resource')
+
+ # 1. Determine `platform`, which may involve resolving redirects and crawling a landing page.
- # XXX: html_guess_platform()
+ # TODO: could involve html_guess_platform() here?
# determine platform
platform_helper = None
@@ -291,10 +305,10 @@ class IngestFilesetWorker(IngestFileWorker):
#print(dataset_meta, file=sys.stderr)
platform = dataset_meta.platform_name
- result['platform'] = dataset_meta.platform_name
+ result['platform_name'] = dataset_meta.platform_name
+ result['platform_domain'] = dataset_meta.platform_domain
result['platform_id'] = dataset_meta.platform_id
- result['item_name'] = dataset_meta.archiveorg_item_name
- result['item_meta'] = dataset_meta.archiveorg_item_meta
+ result['archiveorg_item_name'] = dataset_meta.archiveorg_item_name
if not dataset_meta.manifest:
result['status'] = 'empty-manifest'
@@ -309,6 +323,9 @@ class IngestFilesetWorker(IngestFileWorker):
result['status'] = 'too-large-size'
return result
if result['file_count'] > self.max_file_count:
+ # hard max, to prevent downstream breakage
+ if result['file_count'] > 10*1000:
+ result['manifest'] = result['manifest'][:self.max_file_count]
result['status'] = 'too-many-files'
return result
@@ -327,8 +344,46 @@ class IngestFilesetWorker(IngestFileWorker):
# 4. Summarize status and return structured result metadata.
result['status'] = archive_result.status
result['manifest'] = [m.dict() for m in archive_result.manifest]
- result['file_count'] = len(archive_result.manifest) or None
- result['total_size'] = sum([m.size for m in archive_result.manifest if m.size]) or None
+
+ if ingest_strategy.endswith('-fileset-bundle'):
+ result['fileset_bundle'] = dict(
+ file_meta=archive_result.bundle_file_meta,
+ archiveorg_bundle_path=archive_result.archiveorg_bundle_path,
+ )
+ if archive_result.bundle_terminal:
+ result['fileset_bundle']['terminal'] = dict(
+ terminal_url=archive_result.bundle_terminal.terminal_url,
+ terminal_dt=archive_result.bundle_terminal.terminal_dt,
+ terminal_status_code=archive_result.bundle_terminal.terminal_status_code,
+ )
+ if archive_result.bundle_cdx:
+ result['fileset_bundle']['cdx'] = cdx_to_dict(archive_result.bundle_cdx)
+ if archive_result.bundle_cdx.revisit_cdx:
+ result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(archive_result.bundle_cdx.revisit_cdx)
+ if ingest_strategy.endswith('-file'):
+ result['fileset_file'] = dict(
+ file_meta=archive_result.file_file_meta,
+ )
+ if archive_result.file_terminal:
+ result['fileset_file']['terminal'] = dict(
+ terminal_url=archive_result.file_terminal.terminal_url,
+ terminal_dt=archive_result.file_terminal.terminal_dt,
+ terminal_status_code=archive_result.file_terminal.terminal_status_code,
+ )
+ if archive_result.file_cdx:
+ result['fileset_file']['cdx'] = cdx_to_dict(archive_result.file_cdx)
+ if archive_result.file_cdx.revisit_cdx:
+ result['fileset_file']['revisit_cdx'] = cdx_to_dict(archive_result.file_cdx.revisit_cdx)
+
+ if result['status'].startswith('success'):
+ # check that these are still valid
+ assert result['file_count'] == len(archive_result.manifest)
+ assert result['total_size'] == sum([m.size for m in archive_result.manifest if m.size])
+
+
+ # XXX: here is where we would publish to ingest file result topic... or call process_hit()?
+ if result['status'] == 'success-file':
+ pass
if result['status'].startswith('success'):
result['hit'] = True