aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/ingest_fileset.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/ingest_fileset.py')
-rw-r--r--python/sandcrawler/ingest_fileset.py123
1 files changed, 89 insertions, 34 deletions
diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py
index ce6cdca..de392b2 100644
--- a/python/sandcrawler/ingest_fileset.py
+++ b/python/sandcrawler/ingest_fileset.py
@@ -20,7 +20,7 @@ from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, ht
from sandcrawler.workers import SandcrawlerWorker
from sandcrawler.db import SandcrawlerPostgrestClient
from sandcrawler.ingest_file import IngestFileWorker
-from sandcrawler.fileset_platforms import DatasetPlatformHelper, DATASET_PLATFORM_HELPER_TABLE
+from sandcrawler.fileset_platforms import FilesetPlatformHelper, DATASET_PLATFORM_HELPER_TABLE
from sandcrawler.fileset_strategies import FilesetIngestStrategy, FILESET_STRATEGY_HELPER_TABLE
from sandcrawler.fileset_types import PlatformScopeError, PlatformRestrictedError
@@ -47,8 +47,8 @@ class IngestFilesetWorker(IngestFileWorker):
self.sink = sink
self.dataset_platform_helpers = DATASET_PLATFORM_HELPER_TABLE
self.dataset_strategy_archivers = FILESET_STRATEGY_HELPER_TABLE
- self.max_total_size = 100*1024*1024*1024
- self.max_file_count = 500
+ self.max_total_size = kwargs.get('max_total_size', 64*1024*1024*1024)
+ self.max_file_count = kwargs.get('max_file_count', 200)
def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]:
@@ -79,34 +79,15 @@ class IngestFilesetWorker(IngestFileWorker):
return False
return True
- def process(self, request: dict, key: Any = None) -> dict:
-
- ingest_type = request.get('ingest_type')
- if ingest_type not in ("dataset",):
- raise NotImplementedError(f"can't handle ingest_type={ingest_type}")
-
- # parse/clean URL
- # note that we pass through the original/raw URL, and that is what gets
- # persisted in database table
- base_url = clean_url(request['base_url'])
-
- force_recrawl = bool(request.get('force_recrawl', False))
-
- print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr)
-
- # TODO: "existing" check for new fileset ingest result table
- #existing = self.check_existing_ingest(ingest_type, base_url)
- #if existing:
- # return self.process_existing(request, existing)
+ def fetch_resource_iteratively(self, ingest_type: str, base_url: str, force_recrawl: bool) -> dict:
+ """
+ This is copypasta from process_file(), should probably refactor.
+ """
- result: Dict[str, Any] = dict(request=request, hit=False)
+ result: Dict[str, Any] = dict(hit=False)
result['hops'] = [base_url]
next_url = base_url
- # 1. Determine `platform`, which may involve resolving redirects and crawling a landing page.
-
- ### START COPYPASTA from process_file(), should refactor ###
-
# check against blocklist
for block in self.base_url_blocklist:
# XXX: hack to not skip archive.org content
@@ -247,9 +228,42 @@ class IngestFilesetWorker(IngestFileWorker):
#raise NotImplementedError()
pass
- ### END COPYPASTA ###
+ result['_html_biblio'] = html_biblio
+ result['_resource'] = resource
+ return result
+
+
+ def process(self, request: dict, key: Any = None) -> dict:
+
+ ingest_type = request.get('ingest_type')
+ if ingest_type not in ("dataset",):
+ raise NotImplementedError(f"can't handle ingest_type={ingest_type}")
+
+ # parse/clean URL
+ # note that we pass through the original/raw URL, and that is what gets
+ # persisted in database table
+ base_url = clean_url(request['base_url'])
+
+ force_recrawl = bool(request.get('force_recrawl', False))
+
+ print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr)
+
+ # TODO: "existing" check against file and/or fileset ingest result table
+ #existing = self.check_existing_ingest(ingest_type, base_url)
+ #if existing:
+ # return self.process_existing(request, existing)
+
+ result = self.fetch_resource_iteratively(ingest_type, base_url, force_recrawl=force_recrawl)
+ if result.get('status') != None:
+ result['request'] = request
+ return result
+
+ html_biblio = result.pop('_html_biblio')
+ resource = result.pop('_resource')
+
+ # 1. Determine `platform`, which may involve resolving redirects and crawling a landing page.
- # XXX: html_guess_platform()
+ # TODO: could involve html_guess_platform() here?
# determine platform
platform_helper = None
@@ -291,10 +305,10 @@ class IngestFilesetWorker(IngestFileWorker):
#print(dataset_meta, file=sys.stderr)
platform = dataset_meta.platform_name
- result['platform'] = dataset_meta.platform_name
+ result['platform_name'] = dataset_meta.platform_name
+ result['platform_domain'] = dataset_meta.platform_domain
result['platform_id'] = dataset_meta.platform_id
- result['item_name'] = dataset_meta.archiveorg_item_name
- result['item_meta'] = dataset_meta.archiveorg_item_meta
+ result['archiveorg_item_name'] = dataset_meta.archiveorg_item_name
if not dataset_meta.manifest:
result['status'] = 'empty-manifest'
@@ -309,6 +323,9 @@ class IngestFilesetWorker(IngestFileWorker):
result['status'] = 'too-large-size'
return result
if result['file_count'] > self.max_file_count:
+ # hard max, to prevent downstream breakage
+ if result['file_count'] > 10*1000:
+ result['manifest'] = result['manifest'][:self.max_file_count]
result['status'] = 'too-many-files'
return result
@@ -327,8 +344,46 @@ class IngestFilesetWorker(IngestFileWorker):
# 4. Summarize status and return structured result metadata.
result['status'] = archive_result.status
result['manifest'] = [m.dict() for m in archive_result.manifest]
- result['file_count'] = len(archive_result.manifest) or None
- result['total_size'] = sum([m.size for m in archive_result.manifest if m.size]) or None
+
+ if ingest_strategy.endswith('-fileset-bundle'):
+ result['fileset_bundle'] = dict(
+ file_meta=archive_result.bundle_file_meta,
+ archiveorg_bundle_path=archive_result.archiveorg_bundle_path,
+ )
+ if archive_result.bundle_terminal:
+ result['fileset_bundle']['terminal'] = dict(
+ terminal_url=archive_result.bundle_terminal.terminal_url,
+ terminal_dt=archive_result.bundle_terminal.terminal_dt,
+ terminal_status_code=archive_result.bundle_terminal.terminal_status_code,
+ )
+ if archive_result.bundle_cdx:
+ result['fileset_bundle']['cdx'] = cdx_to_dict(archive_result.bundle_cdx)
+ if archive_result.bundle_cdx.revisit_cdx:
+ result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(archive_result.bundle_cdx.revisit_cdx)
+ if ingest_strategy.endswith('-file'):
+ result['fileset_file'] = dict(
+ file_meta=archive_result.file_file_meta,
+ )
+ if archive_result.file_terminal:
+ result['fileset_file']['terminal'] = dict(
+ terminal_url=archive_result.file_terminal.terminal_url,
+ terminal_dt=archive_result.file_terminal.terminal_dt,
+ terminal_status_code=archive_result.file_terminal.terminal_status_code,
+ )
+ if archive_result.file_cdx:
+ result['fileset_file']['cdx'] = cdx_to_dict(archive_result.file_cdx)
+ if archive_result.file_cdx.revisit_cdx:
+ result['fileset_file']['revisit_cdx'] = cdx_to_dict(archive_result.file_cdx.revisit_cdx)
+
+ if result['status'].startswith('success'):
+ # check that these are still valid
+ assert result['file_count'] == len(archive_result.manifest)
+ assert result['total_size'] == sum([m.size for m in archive_result.manifest if m.size])
+
+
+ # XXX: here is where we would publish to ingest file result topic... or call process_hit()?
+ if result['status'] == 'success-file':
+ pass
if result['status'].startswith('success'):
result['hit'] = True