aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2021-10-06 15:13:46 -0700
committerBryan Newbold <bnewbold@archive.org>2021-10-15 18:15:29 -0700
commit07e8a199766be77f4e89561d03e9b4e995ab7396 (patch)
treef4882b3fd32e0ed46e2359900a01e1413287d53e
parent206969ccebb5007b6c687edd6e09b5c4910e0152 (diff)
downloadsandcrawler-07e8a199766be77f4e89561d03e9b4e995ab7396.tar.gz
sandcrawler-07e8a199766be77f4e89561d03e9b4e995ab7396.zip
fileset ingest progress for dataverse
-rw-r--r--python/sandcrawler/fileset_platforms.py153
-rw-r--r--python/sandcrawler/fileset_strategies.py145
-rw-r--r--python/sandcrawler/fileset_types.py2
-rw-r--r--python/sandcrawler/ingest_fileset.py14
4 files changed, 291 insertions, 23 deletions
diff --git a/python/sandcrawler/fileset_platforms.py b/python/sandcrawler/fileset_platforms.py
index 5342a4e..58094c2 100644
--- a/python/sandcrawler/fileset_platforms.py
+++ b/python/sandcrawler/fileset_platforms.py
@@ -3,9 +3,11 @@ import sys
import json
import gzip
import time
+import urllib.parse
from collections import namedtuple
from typing import Optional, Tuple, Any, Dict, List
+import requests
import internetarchive
from sandcrawler.html_metadata import BiblioMetadata
@@ -30,20 +32,159 @@ class DatasetPlatformHelper():
"""
raise NotImplementedError()
- def chose_strategy(self, DatasetPlatformItem) -> IngestStrategy:
- raise NotImplementedError()
+ def chose_strategy(self, item: DatasetPlatformItem) -> IngestStrategy:
+ assert item.manifest
+ total_size = sum([m.size for m in item.manifest])
+ largest_size = max([m.size for m in item.manifest])
+ print(f" total_size={total_size} largest_size={largest_size}", file=sys.stderr)
+ # XXX: while developing ArchiveorgFileset path
+ return IngestStrategy.ArchiveorgFileset
+ if len(item.manifest) == 1:
+ if total_size < 128*1024*1024:
+ return IngestStrategy.WebFile
+ else:
+ return IngestStrategy.ArchiveorgFile
+ else:
+ if largest_size < 128*1024*1024 and total_size < 1*1024*1024*1024:
+ return IngestStrategy.WebFileset
+ else:
+ return IngestStrategy.ArchiveorgFileset
class DataverseHelper(DatasetPlatformHelper):
def __init__(self):
self.platform_name = 'dataverse'
+ self.session = requests.Session()
+ self.dataverse_domain_allowlist = [
+ 'dataverse.harvard.edu',
+ 'data.lipi.go.id',
+ ]
def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool:
+ """
+ XXX: should match process_request() logic better
+ """
+
+ components = urllib.parse.urlparse(request['base_url'])
+ platform_domain = components.netloc.split(':')[0].lower()
+ params = urllib.parse.parse_qs(components.query)
+ platform_id = params.get('persistentId')
+
+ if not platform_domain in self.dataverse_domain_allowlist:
+ return False
+ if not platform_id:
+ return False
+
+ if html_biblio and 'dataverse' in html_biblio.publisher.lower():
+ return True
return False
- def chose_strategy(self, DatasetPlatformItem) -> IngestStrategy:
- raise NotImplementedError()
+ def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem:
+ """
+ Fetch platform-specific metadata for this request (eg, via API calls)
+
+
+ HTTP GET https://demo.dataverse.org/api/datasets/export?exporter=dataverse_json&persistentId=doi:10.5072/FK2/J8SJZB
+
+ """
+ # 1. extract domain, PID, and version from URL
+ components = urllib.parse.urlparse(request['base_url'])
+ platform_domain = components.netloc.split(':')[0].lower()
+ params = urllib.parse.parse_qs(components.query)
+ dataset_version = params.get('version')
+ platform_id = params.get('persistentId')
+ if not (platform_id and platform_id[0]):
+ raise ValueError("Expected a Dataverse persistentId in URL")
+ else:
+ platform_id = platform_id[0]
+
+ if not platform_domain in self.dataverse_domain_allowlist:
+ raise ValueError(f"unexpected dataverse domain: {platform_domain}")
+
+ # for both handle (hdl:) and DOI (doi:) identifiers, the norm is for
+ # dataverse persistetId is to be structured like:
+ # <prefix> / <shoulder> / <dataset-id> / <file-id>
+ if not (platform_id.startswith('doi:10.') or platform_id.startswith('hdl:')):
+ raise NotImplementedError(f"unsupported dataverse persistentId format: {platform_id}")
+ dataverse_type = None
+ if platform_id.count('/') == 2:
+ dataverse_type = 'dataset'
+ elif platform_id.count('/') == 3:
+ dataverse_type = 'file'
+ else:
+ raise NotImplementedError(f"unsupported dataverse persistentId format: {platform_id}")
+
+ if dataverse_type != 'dataset':
+ # XXX
+ raise NotImplementedError(f"only entire dataverse datasets can be archived with this tool")
+
+ # 1b. if we didn't get a version number from URL, fetch it from API
+ if not dataset_version:
+ obj = self.session.get(f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}").json()
+ obj_latest = obj['data']['latestVersion']
+ dataset_version = f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}"
+
+ # 2. API fetch
+ obj = self.session.get(f"https://{platform_domain}/api/datasets/:persistentId/?persistentId={platform_id}&version={dataset_version}").json()
+
+ obj_latest= obj['data']['latestVersion']
+ assert dataset_version == f"{obj_latest['versionNumber']}.{obj_latest['versionMinorNumber']}"
+ assert platform_id == obj_latest['datasetPersistentId']
+
+ manifest = []
+ for row in obj_latest['files']:
+ df = row['dataFile']
+ df_persistent_id = df['persistentId']
+ platform_url = f"https://{platform_domain}/api/access/datafile/:persistentId/?persistentId={df_persistent_id}"
+ if df.get('originalFileName'):
+ platform_url += '&format=original'
+ manifest.append(FilesetManifestFile(
+ path=df.get('originalFileName') or df['filename'],
+ size=df.get('originalFileSize') or df['filesize'],
+ md5=df['md5'],
+ # NOTE: don't get: sha1, sha256
+ mimetype=df['contentType'],
+ platform_url=platform_url,
+ extra=dict(
+ # file-level
+ description=df.get('description'),
+ version=df.get('version'),
+ ),
+ ))
+
+ platform_sub_id = platform_id.split('/')[-1]
+ archiveorg_item_name = f"{platform_domain}-{platform_sub_id}-v{dataset_version}"
+ archiveorg_item_meta = dict(
+ # XXX: collection=platform_domain,
+ collection="datasets",
+ date=obj_latest['releaseTime'].split('T')[0],
+ source=f"https://{platform_domain}/dataset.xhtml?persistentId={platform_id}&version={dataset_version}",
+ )
+ if platform_id.startswith('doi:10.'):
+ archiveorg_item_meta['doi'] = platform_id.replace('doi:', '')
+ for block in obj_latest['metadataBlocks']['citation']['fields']:
+ if block['typeName'] == 'title':
+ archiveorg_item_meta['title'] = block['value']
+ elif block['typeName'] == 'depositor':
+ archiveorg_item_meta['creator'] = block['value']
+ elif block['typeName'] == 'dsDescription':
+ archiveorg_item_meta['description'] = block['value'][0]['dsDescriptionValue']['value']
+
+ archiveorg_item_meta['description'] = archiveorg_item_meta.get('description', '') + '\n<br>\n' + obj_latest['termsOfUse']
+
+ return DatasetPlatformItem(
+ platform_name=self.platform_name,
+ platform_status='success',
+ manifest=manifest,
+ platform_domain=platform_domain,
+ platform_id=platform_id,
+ archiveorg_item_name=archiveorg_item_name,
+ archiveorg_item_meta=archiveorg_item_meta,
+ web_bundle_url=f"https://{platform_domain}/api/access/dataset/:persistentId/?persistentId={platform_id}&format=original",
+ # TODO: web_base_url= (for GWB downloading, in lieu of platform_url on individual files)
+ extra=dict(version=dataset_version),
+ )
class ArchiveOrgHelper(DatasetPlatformHelper):
@@ -174,7 +315,7 @@ class ArchiveOrgHelper(DatasetPlatformHelper):
platform_domain='archive.org',
platform_id=item_name,
archiveorg_item_name=item_name,
- archiveorg_collection=item_collection,
+ archiveorg_meta=dict(collection=item_collection),
)
def chose_strategy(self, item: DatasetPlatformItem) -> IngestStrategy:
@@ -185,7 +326,7 @@ class ArchiveOrgHelper(DatasetPlatformHelper):
elif len(item.manifest) >= 1:
return IngestStrategy.ArchiveorgFileset
else:
- raise NotImplementedError()
+ raise NotImplementedError("empty dataset")
DATASET_PLATFORM_HELPER_TABLE = {
diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py
index 26bc5ad..c335ea6 100644
--- a/python/sandcrawler/fileset_strategies.py
+++ b/python/sandcrawler/fileset_strategies.py
@@ -1,8 +1,10 @@
+import os
import sys
import json
import gzip
import time
+import shutil
from collections import namedtuple
from typing import Optional, Tuple, Any, Dict, List
@@ -11,6 +13,7 @@ import internetarchive
from sandcrawler.html_metadata import BiblioMetadata
from sandcrawler.ia import ResourceResult
from sandcrawler.fileset_types import IngestStrategy, FilesetManifestFile, DatasetPlatformItem, ArchiveStrategyResult
+from sandcrawler.misc import gen_file_metadata, gen_file_metadata_path
class FilesetIngestStrategy():
@@ -28,27 +31,42 @@ class FilesetIngestStrategy():
class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
- def __init__(self):
+ def __init__(self, **kwargs):
self.ingest_strategy = IngestStrategy.ArchiveorgFileset
- self.session = internetarchive.get_session()
+
+ # XXX: enable cleanup when confident (eg, safe path parsing)
+ self.skip_cleanup_local_files = kwargs.get('skip_cleanup_local_files', True)
+ self.working_dir = os.environ.get('SANDCRAWLER_WORKING_DIR', '/tmp/sandcrawler/')
+ try:
+ os.mkdir(self.working_dir)
+ except FileExistsError:
+ pass
+
+ self.ia_session = internetarchive.get_session()
def check_existing(self, item: DatasetPlatformItem) -> Optional[ArchiveStrategyResult]:
"""
use API to check for item with all the files in the manifest
- TODO: this naive comparison is quadratic in number of files, aka O(N^2)
- XXX: should this verify sha256 and/or mimetype?
+ NOTE: this naive comparison is quadratic in number of files, aka O(N^2)
"""
- ia_item = self.session.get_item(item.archiveorg_item_name)
+ ia_item = self.ia_session.get_item(item.archiveorg_item_name)
+ if not ia_item.exists:
+ return False
item_files = ia_item.get_files(on_the_fly=False)
for wanted in item.manifest:
found = False
for existing in item_files:
- if existing.sha1 == wanted.sha1 and existing.name == wanted.path and existing.size == wanted.size:
- found = True
- break
+ if existing.name == wanted.path:
+ if ((existing.sha1 and existing.sha1 == wanted.sha1) or (existing.md5 and existing.md5 == wanted.md5)) and existing.name == wanted.path and existing.size == wanted.size:
+ found = True
+ wanted.status = 'exists'
+ break
+ else:
+ wanted.status = 'mismatch-existing'
+ break
if not found:
- print(f" didn't find at least one file: {wanted}", file=sys.stderr)
+ print(f" item exists ({item.archiveorg_item_name}) but didn't find at least one file: {wanted.path}", file=sys.stderr)
return None
return ArchiveStrategyResult(
ingest_strategy=self.ingest_strategy,
@@ -57,11 +75,118 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):
)
def process(self, item: DatasetPlatformItem) -> ArchiveStrategyResult:
+ """
+ May require extra context to pass along to archive.org item creation.
+ """
existing = self.check_existing(item)
if existing:
return existing
- raise NotImplementedError()
+
+ local_dir = self.working_dir + item.archiveorg_item_name
+ assert local_dir.startswith('/')
+ assert local_dir.count('/') > 2
+ try:
+ os.mkdir(local_dir)
+ except FileExistsError:
+ pass
+
+ # 1. download all files locally
+ for m in item.manifest:
+ # XXX: enforce safe/sane filename
+
+ local_path = local_dir + '/' + m.path
+ assert m.platform_url
+
+ if not os.path.exists(local_path):
+ print(f" downloading {m.path}", file=sys.stderr)
+ with self.ia_session.get(m.platform_url, stream=True, allow_redirects=True) as r:
+ r.raise_for_status()
+ with open(local_path + '.partial', 'wb') as f:
+ for chunk in r.iter_content(chunk_size=256*1024):
+ f.write(chunk)
+ os.rename(local_path + '.partial', local_path)
+ m.status = 'downloaded-local'
+ else:
+ m.status = 'exists-local'
+
+ print(f" verifying {m.path}", file=sys.stderr)
+ file_meta = gen_file_metadata_path(local_path, allow_empty=True)
+ assert file_meta['size_bytes'] == m.size, f"expected: {m.size} found: {file_meta['size_bytes']}"
+
+ if m.sha1:
+ assert file_meta['sha1hex'] == m.sha1
+ else:
+ m.sha1 = file_meta['sha1hex']
+
+ if m.sha256:
+ assert file_meta['sha256hex'] == m.sha256
+ else:
+ m.sha256 = file_meta['sha256hex']
+
+ if m.md5:
+ assert file_meta['md5hex'] == m.md5
+ else:
+ m.md5 = file_meta['md5hex']
+
+ if m.mimetype:
+ # 'magic' isn't good and parsing more detailed text file formats like text/csv
+ if file_meta['mimetype'] != m.mimetype and file_meta['mimetype'] != 'text/plain':
+ # these 'tab-separated-values' from dataverse are just noise, don't log them
+ if m.mimetype != 'text/tab-separated-values':
+ print(f" WARN: mimetype mismatch: expected {m.mimetype}, found {file_meta['mimetype']}", file=sys.stderr)
+ m.mimetype = file_meta['mimetype']
+ else:
+ m.mimetype = file_meta['mimetype']
+ m.status = 'verified-local'
+
+ # 2. setup archive.org item metadata
+ assert item.archiveorg_item_meta['collection']
+ # 3. upload all files
+ item_files = []
+ for m in item.manifest:
+ local_path = local_dir + '/' + m.path
+ item_files.append({
+ 'name': local_path,
+ 'remote_name': m.path,
+ })
+
+ print(f" uploading all files to {item.archiveorg_item_name} under {item.archiveorg_item_meta.get('collection')}...", file=sys.stderr)
+ internetarchive.upload(
+ item.archiveorg_item_name,
+ files=item_files,
+ metadata=item.archiveorg_item_meta,
+ checksum=True,
+ queue_derive=False,
+ verify=True,
+ )
+
+ for m in item.manifest:
+ m.status = 'success'
+
+ # 4. delete local directory
+ if not self.skip_cleanup_local_files:
+ shutil.rmtree(local_dir)
+
+ result = ArchiveStrategyResult(
+ ingest_strategy=self.ingest_strategy,
+ status='success',
+ manifest=item.manifest,
+ )
+
+ return result
+
+class ArchiveorgFileStrategy(ArchiveorgFilesetStrategy):
+ """
+ ArchiveorgFilesetStrategy currently works fine with individual files. Just
+ need to over-ride the ingest_strategy name.
+ """
+
+ def __init__(self):
+ super().__init__()
+ self.ingest_strategy = IngestStrategy.ArchiveorgFileset
+
FILESET_STRATEGY_HELPER_TABLE = {
IngestStrategy.ArchiveorgFileset: ArchiveorgFilesetStrategy(),
+ IngestStrategy.ArchiveorgFile: ArchiveorgFileStrategy(),
}
diff --git a/python/sandcrawler/fileset_types.py b/python/sandcrawler/fileset_types.py
index 75f4992..51000d7 100644
--- a/python/sandcrawler/fileset_types.py
+++ b/python/sandcrawler/fileset_types.py
@@ -34,7 +34,7 @@ class DatasetPlatformItem(BaseModel):
platform_domain: Optional[str]
platform_id: Optional[str]
archiveorg_item_name: Optional[str]
- archiveorg_collection: Optional[str]
+ archiveorg_item_meta: Optional[dict]
web_base_url: Optional[str]
web_bundle_url: Optional[str]
diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py
index 50e3c05..3b55793 100644
--- a/python/sandcrawler/ingest_fileset.py
+++ b/python/sandcrawler/ingest_fileset.py
@@ -262,19 +262,21 @@ class IngestFilesetWorker(IngestFileWorker):
if resource:
terminal_url = resource.terminal_url
dataset_meta = platform_helper.process_request(request, terminal_url, html_biblio)
- print(dataset_meta, file=sys.stderr)
+ #print(dataset_meta, file=sys.stderr)
platform = dataset_meta.platform_name
result['platform'] = dataset_meta.platform_name
result['platform_id'] = dataset_meta.platform_id
result['item_name'] = dataset_meta.archiveorg_item_name
- if not dataset_meta.manifest:
+ result['item_meta'] = dataset_meta.archiveorg_item_meta
+
+ if dataset_meta.manifest:
+ result['manifest'] = [m.dict() for m in dataset_meta.manifest]
+ result['file_count'] = len(dataset_meta.manifest)
+ result['total_size'] = sum([m.size for m in dataset_meta.manifest if m.size])
+ else:
result['status'] = 'no-manifest'
return result
- result['manifest'] = dataset_meta.manifest or None
- result['file_count'] = len(dataset_meta.manifest) or None
- result['total_size'] = sum([m.size for m in dataset_meta.manifest if m.size]) or None
-
ingest_strategy = platform_helper.chose_strategy(dataset_meta)
result['ingest_strategy'] = ingest_strategy