aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/ingest_fileset.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/ingest_fileset.py')
-rw-r--r--python/sandcrawler/ingest_fileset.py89
1 files changed, 56 insertions, 33 deletions
diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py
index de392b2..7c0dfbd 100644
--- a/python/sandcrawler/ingest_fileset.py
+++ b/python/sandcrawler/ingest_fileset.py
@@ -49,7 +49,8 @@ class IngestFilesetWorker(IngestFileWorker):
self.dataset_strategy_archivers = FILESET_STRATEGY_HELPER_TABLE
self.max_total_size = kwargs.get('max_total_size', 64*1024*1024*1024)
self.max_file_count = kwargs.get('max_file_count', 200)
-
+ self.ingest_file_result_sink = kwargs.get('ingest_file_result_sink')
+ self.ingest_file_result_stdout = kwargs.get('ingest_file_result_stdout', False)
def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]:
"""
@@ -71,9 +72,6 @@ class IngestFilesetWorker(IngestFileWorker):
"""
raise NotImplementedError("process_existing() not tested or safe yet")
- # XXX: use file version
- #def process_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict:
-
def want(self, request: dict) -> bool:
if not request.get('ingest_type') in ('dataset',):
return False
@@ -254,6 +252,7 @@ class IngestFilesetWorker(IngestFileWorker):
# return self.process_existing(request, existing)
result = self.fetch_resource_iteratively(ingest_type, base_url, force_recrawl=force_recrawl)
+ result['request'] = request
if result.get('status') != None:
result['request'] = request
return result
@@ -308,6 +307,7 @@ class IngestFilesetWorker(IngestFileWorker):
result['platform_name'] = dataset_meta.platform_name
result['platform_domain'] = dataset_meta.platform_domain
result['platform_id'] = dataset_meta.platform_id
+ result['platform_base_url'] = dataset_meta.web_base_url
result['archiveorg_item_name'] = dataset_meta.archiveorg_item_name
if not dataset_meta.manifest:
@@ -315,7 +315,7 @@ class IngestFilesetWorker(IngestFileWorker):
return result
# these will get confirmed/updated after ingest
- result['manifest'] = [m.dict() for m in dataset_meta.manifest]
+ result['manifest'] = [m.dict(exclude_none=True) for m in dataset_meta.manifest]
result['file_count'] = len(dataset_meta.manifest)
result['total_size'] = sum([m.size for m in dataset_meta.manifest if m.size])
@@ -343,47 +343,70 @@ class IngestFilesetWorker(IngestFileWorker):
# 4. Summarize status and return structured result metadata.
result['status'] = archive_result.status
- result['manifest'] = [m.dict() for m in archive_result.manifest]
+ result['manifest'] = [m.dict(exclude_none=True) for m in archive_result.manifest]
if ingest_strategy.endswith('-fileset-bundle'):
- result['fileset_bundle'] = dict(
- file_meta=archive_result.bundle_file_meta,
- archiveorg_bundle_path=archive_result.archiveorg_bundle_path,
- )
- if archive_result.bundle_terminal:
+ result['fileset_bundle'] = dict()
+ if archive_result.bundle_file_meta:
+ result['fileset_bundle']['file_meta'] = archive_result.bundle_file_meta
+ if archive_result.archiveorg_bundle_path:
+ result['fileset_bundle']['archiveorg_bundle_path'] = archive_result.archiveorg_bundle_path
+ if archive_result.bundle_resource:
result['fileset_bundle']['terminal'] = dict(
- terminal_url=archive_result.bundle_terminal.terminal_url,
- terminal_dt=archive_result.bundle_terminal.terminal_dt,
- terminal_status_code=archive_result.bundle_terminal.terminal_status_code,
+ terminal_url=archive_result.bundle_resource.terminal_url,
+ terminal_dt=archive_result.bundle_resource.terminal_dt,
+ terminal_status_code=archive_result.bundle_resource.terminal_status_code,
)
- if archive_result.bundle_cdx:
- result['fileset_bundle']['cdx'] = cdx_to_dict(archive_result.bundle_cdx)
- if archive_result.bundle_cdx.revisit_cdx:
- result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(archive_result.bundle_cdx.revisit_cdx)
+ if archive_result.bundle_resource.cdx:
+ result['fileset_bundle']['cdx'] = cdx_to_dict(archive_result.bundle_resource.cdx)
+ if archive_result.bundle_resource.revisit_cdx:
+ result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(archive_result.bundle_resource.revisit_cdx)
+
if ingest_strategy.endswith('-file'):
- result['fileset_file'] = dict(
- file_meta=archive_result.file_file_meta,
- )
- if archive_result.file_terminal:
+ result['fileset_file'] = dict()
+ if archive_result.file_file_meta:
+ result['fileset_file']['file_meta'] = file_meta=archive_result.file_file_meta,
+ if archive_result.file_resource:
result['fileset_file']['terminal'] = dict(
- terminal_url=archive_result.file_terminal.terminal_url,
- terminal_dt=archive_result.file_terminal.terminal_dt,
- terminal_status_code=archive_result.file_terminal.terminal_status_code,
+ terminal_url=archive_result.file_resource.terminal_url,
+ terminal_dt=archive_result.file_resource.terminal_dt,
+ terminal_status_code=archive_result.file_resource.terminal_status_code,
)
- if archive_result.file_cdx:
- result['fileset_file']['cdx'] = cdx_to_dict(archive_result.file_cdx)
- if archive_result.file_cdx.revisit_cdx:
- result['fileset_file']['revisit_cdx'] = cdx_to_dict(archive_result.file_cdx.revisit_cdx)
+ if archive_result.file_resource.cdx:
+ result['fileset_file']['cdx'] = cdx_to_dict(archive_result.file_resource.cdx)
+ if archive_result.file_resource.revisit_cdx:
+ result['fileset_file']['revisit_cdx'] = cdx_to_dict(archive_result.file_resource.revisit_cdx)
if result['status'].startswith('success'):
# check that these are still valid
assert result['file_count'] == len(archive_result.manifest)
assert result['total_size'] == sum([m.size for m in archive_result.manifest if m.size])
-
- # XXX: here is where we would publish to ingest file result topic... or call process_hit()?
- if result['status'] == 'success-file':
- pass
+ if result['status'] == 'success-file' and archive_result.file_resource and archive_result.file_file_meta:
+ file_result = dict(
+ hit=True,
+ status='success',
+ request=request.copy(),
+ file_meta=archive_result.file_file_meta,
+ terminal=dict(
+ terminal_url=archive_result.file_resource.terminal_url,
+ terminal_dt=archive_result.file_resource.terminal_dt,
+ terminal_status_code=archive_result.file_resource.terminal_status_code,
+ terminal_sha1hex=archive_result.file_file_meta['sha1hex'],
+ ),
+ )
+ if archive_result.file_resource.cdx:
+ file_result['cdx'] = cdx_to_dict(archive_result.file_resource.cdx)
+ if archive_result.file_resource.revisit_cdx:
+ file_result['revisit_cdx'] = cdx_to_dict(archive_result.file_resource.revisit_cdx)
+ file_result['request']['ingest_type'] = request['ingest_type'] + "-file"
+ # call the super() (ingest_file) version of process_hit()
+ info = self.process_file_hit(file_result['request']['ingest_type'], archive_result.file_resource, archive_result.file_file_meta)
+ file_result.update(info)
+ if self.ingest_file_result_sink:
+ self.ingest_file_result_sink.push_record(result.copy())
+ elif self.ingest_file_result_stdout:
+ sys.stdout.write(json.dumps(file_result, sort_keys=True) + "\n")
if result['status'].startswith('success'):
result['hit'] = True