aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rwxr-xr-xpython/ingest_tool.py1
-rw-r--r--python/sandcrawler/fileset_platforms.py4
-rw-r--r--python/sandcrawler/fileset_strategies.py26
-rw-r--r--python/sandcrawler/fileset_types.py6
-rw-r--r--python/sandcrawler/ingest_fileset.py89
5 files changed, 81 insertions, 45 deletions
diff --git a/python/ingest_tool.py b/python/ingest_tool.py
index 45e1812..fdb5b48 100755
--- a/python/ingest_tool.py
+++ b/python/ingest_tool.py
@@ -21,6 +21,7 @@ def run_single_ingest(args):
if request['ingest_type'] in ['dataset',]:
ingester = IngestFilesetWorker(
try_spn2=not args.no_spn2,
+ ingest_file_result_stdout=True,
)
else:
ingester = IngestFileWorker(
diff --git a/python/sandcrawler/fileset_platforms.py b/python/sandcrawler/fileset_platforms.py
index bcf2144..cc07948 100644
--- a/python/sandcrawler/fileset_platforms.py
+++ b/python/sandcrawler/fileset_platforms.py
@@ -674,8 +674,8 @@ class ArchiveOrgHelper(FilesetPlatformHelper):
assert base_url_split[2] == 'archive.org'
assert base_url_split[3] in ['details', 'download']
item_name = base_url_split[4]
- if len(base_url_split) == 6:
- assert not base_url_split[5]
+ if len(base_url_split) == 6 and base_url_split[5]:
+ raise PlatformScopeError("got an archive.org file path, not download/details page; individual files not handled yet")
#print(f" archiveorg processing item={item_name}", file=sys.stderr)
item = self.session.get_item(item_name)
diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py
index d1193ee..f2f2fcc 100644
--- a/python/sandcrawler/fileset_strategies.py
+++ b/python/sandcrawler/fileset_strategies.py
@@ -13,14 +13,14 @@ import internetarchive
from sandcrawler.html_metadata import BiblioMetadata
from sandcrawler.ia import ResourceResult, WaybackClient, SavePageNowClient, fix_transfer_encoding
from sandcrawler.fileset_types import IngestStrategy, FilesetManifestFile, FilesetPlatformItem, ArchiveStrategyResult, PlatformScopeError
-from sandcrawler.misc import gen_file_metadata, gen_file_metadata_path
+from sandcrawler.misc import gen_file_metadata, gen_file_metadata_path, sanitize_fs_path
class FilesetIngestStrategy():
def __init__(self):
#self.ingest_strategy = 'unknown'
- pass
+ self.success_status = "success"
def check_existing(self, item: FilesetPlatformItem) -> Optional[ArchiveStrategyResult]:
raise NotImplementedError()
@@ -34,7 +34,7 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
def __init__(self, **kwargs):
self.ingest_strategy = IngestStrategy.ArchiveorgFileset
- # XXX: enable cleanup when confident (eg, safe path parsing)
+ # TODO: enable cleanup when confident (eg, safe path parsing)
self.skip_cleanup_local_files = kwargs.get('skip_cleanup_local_files', True)
self.working_dir = os.environ.get('SANDCRAWLER_WORKING_DIR', '/tmp/sandcrawler/')
try:
@@ -97,7 +97,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
# 1. download all files locally
assert item.manifest
for m in item.manifest:
- # XXX: enforce safe/sane filename
+ if m.path != sanitize_fs_path(m.path):
+ m.status = 'unsafe-path'
+ continue
local_path = local_dir + '/' + m.path
assert m.platform_url
@@ -173,7 +175,7 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
result = ArchiveStrategyResult(
ingest_strategy=self.ingest_strategy,
- status='success',
+ status=self.success_status,
manifest=item.manifest,
)
@@ -188,6 +190,7 @@ class ArchiveorgFileStrategy(ArchiveorgFilesetStrategy):
def __init__(self):
super().__init__()
self.ingest_strategy = IngestStrategy.ArchiveorgFileset
+ self.success_status = "success-file"
class WebFilesetStrategy(FilesetIngestStrategy):
@@ -206,6 +209,8 @@ class WebFilesetStrategy(FilesetIngestStrategy):
"""
assert item.manifest
+ file_file_meta = None
+ file_resource = None
for m in item.manifest:
fetch_url = m.platform_url
if not fetch_url:
@@ -228,6 +233,8 @@ class WebFilesetStrategy(FilesetIngestStrategy):
m.terminal_url = resource.terminal_url
m.terminal_dt = resource.terminal_dt
m.status = resource.status
+ if self.ingest_strategy == "web-file":
+ file_resource = resource
if resource.status != 'success':
continue
@@ -237,6 +244,9 @@ class WebFilesetStrategy(FilesetIngestStrategy):
file_meta = gen_file_metadata(resource.body)
file_meta, html_resource = fix_transfer_encoding(file_meta, resource)
+ if self.ingest_strategy == "web-file":
+ file_file_meta = file_meta
+
if file_meta['size_bytes'] != m.size or (m.md5 and m.md5 != file_meta['md5hex']) or (m.sha1 and m.sha1 != file_meta['sha1hex']):
m.status = 'mismatch'
continue
@@ -246,7 +256,7 @@ class WebFilesetStrategy(FilesetIngestStrategy):
m.sha256 = m.sha256 or file_meta['sha256hex']
m.mimetype = m.mimetype or file_meta['mimetype']
- overall_status = "success"
+ overall_status = self.success_status
for m in item.manifest:
if m.status != 'success':
overall_status = m.status or 'not-processed'
@@ -259,6 +269,9 @@ class WebFilesetStrategy(FilesetIngestStrategy):
status=overall_status,
manifest=item.manifest,
)
+ if self.ingest_strategy == "web-file":
+ result.file_file_meta = file_file_meta
+ result.file_resource = file_resource
return result
class WebFileStrategy(WebFilesetStrategy):
@@ -266,6 +279,7 @@ class WebFileStrategy(WebFilesetStrategy):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.ingest_strategy = IngestStrategy.WebFile
+ self.success_status = "success-file"
FILESET_STRATEGY_HELPER_TABLE = {
diff --git a/python/sandcrawler/fileset_types.py b/python/sandcrawler/fileset_types.py
index 037843e..d7e9d6d 100644
--- a/python/sandcrawler/fileset_types.py
+++ b/python/sandcrawler/fileset_types.py
@@ -43,11 +43,9 @@ class ArchiveStrategyResult(BaseModel):
status: str
manifest: List[FilesetManifestFile]
file_file_meta: Optional[Dict[str, Any]]
- file_terminal: Optional[Dict[str, Any]]
- file_cdx: Optional[Dict[str, Any]]
+ file_resource: Optional[Any]
bundle_file_meta: Optional[Dict[str, Any]]
- bundle_terminal: Optional[Any]
- bundle_cdx: Optional[Any]
+ bundle_resource: Optional[Any]
bundle_archiveorg_path: Optional[str]
class PlatformScopeError(Exception):
diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py
index de392b2..7c0dfbd 100644
--- a/python/sandcrawler/ingest_fileset.py
+++ b/python/sandcrawler/ingest_fileset.py
@@ -49,7 +49,8 @@ class IngestFilesetWorker(IngestFileWorker):
self.dataset_strategy_archivers = FILESET_STRATEGY_HELPER_TABLE
self.max_total_size = kwargs.get('max_total_size', 64*1024*1024*1024)
self.max_file_count = kwargs.get('max_file_count', 200)
-
+ self.ingest_file_result_sink = kwargs.get('ingest_file_result_sink')
+ self.ingest_file_result_stdout = kwargs.get('ingest_file_result_stdout', False)
def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]:
"""
@@ -71,9 +72,6 @@ class IngestFilesetWorker(IngestFileWorker):
"""
raise NotImplementedError("process_existing() not tested or safe yet")
- # XXX: use file version
- #def process_hit(self, ingest_type: str, resource: ResourceResult, file_meta: dict) -> dict:
-
def want(self, request: dict) -> bool:
if not request.get('ingest_type') in ('dataset',):
return False
@@ -254,6 +252,7 @@ class IngestFilesetWorker(IngestFileWorker):
# return self.process_existing(request, existing)
result = self.fetch_resource_iteratively(ingest_type, base_url, force_recrawl=force_recrawl)
+ result['request'] = request
if result.get('status') != None:
result['request'] = request
return result
@@ -308,6 +307,7 @@ class IngestFilesetWorker(IngestFileWorker):
result['platform_name'] = dataset_meta.platform_name
result['platform_domain'] = dataset_meta.platform_domain
result['platform_id'] = dataset_meta.platform_id
+ result['platform_base_url'] = dataset_meta.web_base_url
result['archiveorg_item_name'] = dataset_meta.archiveorg_item_name
if not dataset_meta.manifest:
@@ -315,7 +315,7 @@ class IngestFilesetWorker(IngestFileWorker):
return result
# these will get confirmed/updated after ingest
- result['manifest'] = [m.dict() for m in dataset_meta.manifest]
+ result['manifest'] = [m.dict(exclude_none=True) for m in dataset_meta.manifest]
result['file_count'] = len(dataset_meta.manifest)
result['total_size'] = sum([m.size for m in dataset_meta.manifest if m.size])
@@ -343,47 +343,70 @@ class IngestFilesetWorker(IngestFileWorker):
# 4. Summarize status and return structured result metadata.
result['status'] = archive_result.status
- result['manifest'] = [m.dict() for m in archive_result.manifest]
+ result['manifest'] = [m.dict(exclude_none=True) for m in archive_result.manifest]
if ingest_strategy.endswith('-fileset-bundle'):
- result['fileset_bundle'] = dict(
- file_meta=archive_result.bundle_file_meta,
- archiveorg_bundle_path=archive_result.archiveorg_bundle_path,
- )
- if archive_result.bundle_terminal:
+ result['fileset_bundle'] = dict()
+ if archive_result.bundle_file_meta:
+ result['fileset_bundle']['file_meta'] = archive_result.bundle_file_meta
+ if archive_result.archiveorg_bundle_path:
+ result['fileset_bundle']['archiveorg_bundle_path'] = archive_result.archiveorg_bundle_path
+ if archive_result.bundle_resource:
result['fileset_bundle']['terminal'] = dict(
- terminal_url=archive_result.bundle_terminal.terminal_url,
- terminal_dt=archive_result.bundle_terminal.terminal_dt,
- terminal_status_code=archive_result.bundle_terminal.terminal_status_code,
+ terminal_url=archive_result.bundle_resource.terminal_url,
+ terminal_dt=archive_result.bundle_resource.terminal_dt,
+ terminal_status_code=archive_result.bundle_resource.terminal_status_code,
)
- if archive_result.bundle_cdx:
- result['fileset_bundle']['cdx'] = cdx_to_dict(archive_result.bundle_cdx)
- if archive_result.bundle_cdx.revisit_cdx:
- result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(archive_result.bundle_cdx.revisit_cdx)
+ if archive_result.bundle_resource.cdx:
+ result['fileset_bundle']['cdx'] = cdx_to_dict(archive_result.bundle_resource.cdx)
+ if archive_result.bundle_resource.revisit_cdx:
+ result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(archive_result.bundle_resource.revisit_cdx)
+
if ingest_strategy.endswith('-file'):
- result['fileset_file'] = dict(
- file_meta=archive_result.file_file_meta,
- )
- if archive_result.file_terminal:
+ result['fileset_file'] = dict()
+ if archive_result.file_file_meta:
+ result['fileset_file']['file_meta'] = file_meta=archive_result.file_file_meta,
+ if archive_result.file_resource:
result['fileset_file']['terminal'] = dict(
- terminal_url=archive_result.file_terminal.terminal_url,
- terminal_dt=archive_result.file_terminal.terminal_dt,
- terminal_status_code=archive_result.file_terminal.terminal_status_code,
+ terminal_url=archive_result.file_resource.terminal_url,
+ terminal_dt=archive_result.file_resource.terminal_dt,
+ terminal_status_code=archive_result.file_resource.terminal_status_code,
)
- if archive_result.file_cdx:
- result['fileset_file']['cdx'] = cdx_to_dict(archive_result.file_cdx)
- if archive_result.file_cdx.revisit_cdx:
- result['fileset_file']['revisit_cdx'] = cdx_to_dict(archive_result.file_cdx.revisit_cdx)
+ if archive_result.file_resource.cdx:
+ result['fileset_file']['cdx'] = cdx_to_dict(archive_result.file_resource.cdx)
+ if archive_result.file_resource.revisit_cdx:
+ result['fileset_file']['revisit_cdx'] = cdx_to_dict(archive_result.file_resource.revisit_cdx)
if result['status'].startswith('success'):
# check that these are still valid
assert result['file_count'] == len(archive_result.manifest)
assert result['total_size'] == sum([m.size for m in archive_result.manifest if m.size])
-
- # XXX: here is where we would publish to ingest file result topic... or call process_hit()?
- if result['status'] == 'success-file':
- pass
+ if result['status'] == 'success-file' and archive_result.file_resource and archive_result.file_file_meta:
+ file_result = dict(
+ hit=True,
+ status='success',
+ request=request.copy(),
+ file_meta=archive_result.file_file_meta,
+ terminal=dict(
+ terminal_url=archive_result.file_resource.terminal_url,
+ terminal_dt=archive_result.file_resource.terminal_dt,
+ terminal_status_code=archive_result.file_resource.terminal_status_code,
+ terminal_sha1hex=archive_result.file_file_meta['sha1hex'],
+ ),
+ )
+ if archive_result.file_resource.cdx:
+ file_result['cdx'] = cdx_to_dict(archive_result.file_resource.cdx)
+ if archive_result.file_resource.revisit_cdx:
+ file_result['revisit_cdx'] = cdx_to_dict(archive_result.file_resource.revisit_cdx)
+ file_result['request']['ingest_type'] = request['ingest_type'] + "-file"
+ # call the super() (ingest_file) version of process_hit()
+ info = self.process_file_hit(file_result['request']['ingest_type'], archive_result.file_resource, archive_result.file_file_meta)
+ file_result.update(info)
+ if self.ingest_file_result_sink:
+ self.ingest_file_result_sink.push_record(result.copy())
+ elif self.ingest_file_result_stdout:
+ sys.stdout.write(json.dumps(file_result, sort_keys=True) + "\n")
if result['status'].startswith('success'):
result['hit'] = True