diff options
| -rw-r--r-- | proposals/2021-09-09_fileset_ingest.md | 33 | ||||
| -rw-r--r-- | python/sandcrawler/fileset_platforms.py | 69 | ||||
| -rw-r--r-- | python/sandcrawler/fileset_strategies.py | 23 | ||||
| -rw-r--r-- | python/sandcrawler/fileset_types.py | 15 | ||||
| -rw-r--r-- | python/sandcrawler/ingest_fileset.py | 123 | 
5 files changed, 167 insertions, 96 deletions
diff --git a/proposals/2021-09-09_fileset_ingest.md b/proposals/2021-09-09_fileset_ingest.md index bb9d358..82da7d7 100644 --- a/proposals/2021-09-09_fileset_ingest.md +++ b/proposals/2021-09-09_fileset_ingest.md @@ -121,14 +121,9 @@ New python types:          ingest_strategy: str          status: str          manifest: List[FilesetManifestFile] - -    FilesetIngestResult -        ingest_strategy: str -        status: str -        manifest: List[FilesetManifestFile] -        single_file_meta: Optional[dict] -        single_terminal: Optional[dict] -        single_cdx: Optional[dict] +        file_file_meta: Optional[dict] +        file_terminal: Optional[dict] +        file_cdx: Optional[dict]          bundle_file_meta: Optional[dict]          bundle_terminal: Optional[dict]          bundle_cdx: Optional[dict] @@ -160,6 +155,9 @@ New python APIs/classes:    valid platform, which could be found via API or parsing, but has the wrong    scope. Eg, tried to fetch a dataset, but got a DOI which represents all    versions of the dataset, not a specific version. +- `platform-restricted`/`PlatformRestrictedError`: for, eg, embargos +- `platform-404`: got to a landing page, and seemed like in-scope, but no +  platform record found anyways  ## New Sandcrawler Code and Worker @@ -216,11 +214,14 @@ Additional fileset-specific fields:      platform_id: str      ingest_strategy: str      archiveorg_item_name: str (optional, only for `archiveorg-*` strategies) +    file_count: int +    total_size: int      fileset_bundle (optional, only for `*-fileset-bundle` strategy) -        archiveorg_bundle_path          file_meta          cdx +        revisit_cdx          terminal +        archiveorg_bundle_path      fileset_file (optional, only for `*-file` strategy)          file_meta          terminal @@ -247,6 +248,9 @@ condition.  ## New SQL Tables +Note that this table *complements* `ingest_file_result`, doesn't replace it. +`ingest_file_result` could more accurately be called `ingest_result`. +      CREATE TABLE IF NOT EXISTS ingest_fileset_platform (          ingest_type             TEXT NOT NULL CHECK (octet_length(ingest_type) >= 1),          base_url                TEXT NOT NULL CHECK (octet_length(base_url) >= 1), @@ -254,9 +258,9 @@ condition.          hit                     BOOLEAN NOT NULL,          status                  TEXT CHECK (octet_length(status) >= 1), -        platform_name           TEXT CHECK (octet_length(platform) >= 1), -        platform_domain         TEXT CHECK (octet_length(platform_domain) >= 1), -        platform_id             TEXT CHECK (octet_length(platform_id) >= 1), +        platform_name           TEXT NOT NULL CHECK (octet_length(platform) >= 1), +        platform_domain         TEXT NOT NULL CHECK (octet_length(platform_domain) >= 1), +        platform_id             TEXT NOT NULL CHECK (octet_length(platform_id) >= 1),          ingest_strategy         TEXT CHECK (octet_length(ingest_strategy) >= 1),          total_size              BIGINT,          file_count              INT, @@ -282,9 +286,10 @@ condition.          PRIMARY KEY (ingest_type, base_url)      ); -    CREATE INDEX ingest_fileset_result_terminal_url_idx ON ingest_fileset_result(terminal_url); -    # TODO: index on (platform_name,platform_domain,platform_id) ? +    CREATE INDEX ingest_fileset_platform_name_domain_id_idx ON ingest_fileset_platform(platform_name, platform_domain, platform_id); +Persist worker should only insert in to this table if `platform_name`, +`platform_domain`, and `platform_id` are extracted successfully.  ## New Kafka Topic diff --git a/python/sandcrawler/fileset_platforms.py b/python/sandcrawler/fileset_platforms.py index 5f2f743..bcf2144 100644 --- a/python/sandcrawler/fileset_platforms.py +++ b/python/sandcrawler/fileset_platforms.py @@ -15,7 +15,7 @@ from sandcrawler.ia import ResourceResult  from sandcrawler.fileset_types import * -class DatasetPlatformHelper(): +class FilesetPlatformHelper():      def __init__(self):          self.platform_name = 'unknown' @@ -26,16 +26,16 @@ class DatasetPlatformHelper():          """          raise NotImplementedError() -    def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem: +    def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:          """          Fetch platform-specific metadata for this request (eg, via API calls)          """          raise NotImplementedError() -    def chose_strategy(self, item: DatasetPlatformItem) -> IngestStrategy: +    def chose_strategy(self, item: FilesetPlatformItem) -> IngestStrategy:          assert item.manifest -        total_size = sum([m.size for m in item.manifest]) -        largest_size = max([m.size for m in item.manifest]) +        total_size = sum([m.size for m in item.manifest]) or 0 +        largest_size = max([m.size or 0 for m in item.manifest]) or 0          #print(f"  total_size={total_size} largest_size={largest_size}", file=sys.stderr)          # XXX: while developing ArchiveorgFileset path          #return IngestStrategy.ArchiveorgFileset @@ -51,7 +51,7 @@ class DatasetPlatformHelper():                  return IngestStrategy.ArchiveorgFileset -class DataverseHelper(DatasetPlatformHelper): +class DataverseHelper(FilesetPlatformHelper):      def __init__(self):          self.platform_name = 'dataverse' @@ -133,10 +133,10 @@ class DataverseHelper(DatasetPlatformHelper):          components = urllib.parse.urlparse(url)          platform_domain = components.netloc.split(':')[0].lower()          params = urllib.parse.parse_qs(components.query) -        platform_id = params.get('persistentId') -        if not platform_id: +        id_param = params.get('persistentId') +        if not id_param:              return False -        platform_id = platform_id[0] +        platform_id = id_param[0]          try:              parsed = self.parse_dataverse_persistentid(platform_id) @@ -145,7 +145,7 @@ class DataverseHelper(DatasetPlatformHelper):          return True -    def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem: +    def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:          """          Fetch platform-specific metadata for this request (eg, via API calls) @@ -162,14 +162,14 @@ class DataverseHelper(DatasetPlatformHelper):          components = urllib.parse.urlparse(url)          platform_domain = components.netloc.split(':')[0].lower()          params = urllib.parse.parse_qs(components.query) -        dataset_version = params.get('version') -        platform_id = params.get('persistentId') -        if not (platform_id and platform_id[0]): +        id_param = params.get('persistentId') +        if not (id_param and id_param[0]):              raise PlatformScopeError("Expected a Dataverse persistentId in URL") -        else: -            platform_id = platform_id[0] -        if type(dataset_version) == list: -            dataset_version = dataset_version[0] +        platform_id = id_param[0] +        version_param = params.get('version') +        dataset_version = None +        if version_param: +            dataset_version = version_param[0]          try:              parsed_id = self.parse_dataverse_persistentid(platform_id) @@ -243,7 +243,7 @@ class DataverseHelper(DatasetPlatformHelper):          if obj_latest.get('termsOfUse'):              archiveorg_item_meta['description'] += '\n<br>\n' + obj_latest['termsOfUse'] -        return DatasetPlatformItem( +        return FilesetPlatformItem(              platform_name=self.platform_name,              platform_status='success',              manifest=manifest, @@ -321,18 +321,18 @@ def test_parse_dataverse_persistentid():          except ValueError:              pass -class FigshareHelper(DatasetPlatformHelper): +class FigshareHelper(FilesetPlatformHelper):      def __init__(self):          self.platform_name = 'figshare'          self.session = requests.Session()      @staticmethod -    def parse_figshare_url_path(path: str) -> List[str]: +    def parse_figshare_url_path(path: str) -> Tuple[str, Optional[str]]:          """          Tries to parse a figshare URL into ID number and (optional) version number. -        Returns a two-element list; version number will be None if not found +        Returns a two-element tuple; version number will be None if not found          Raises a ValueError if not a figshare URL          """ @@ -340,14 +340,14 @@ class FigshareHelper(DatasetPlatformHelper):          comp = path.split('/')          if len(comp) < 4 or comp[1] != 'articles': -            raise ValueError +            raise ValueError(f"not a figshare URL: {path}")          if len(comp) == 5 and comp[3].isdigit() and comp[4].isdigit():              return (comp[3], comp[4])          elif len(comp) == 4 and comp[3].isdigit():              return (comp[3], None)          else: -            raise ValueError +            raise ValueError(f"couldn't find figshare identiier: {path}")      def match_request(self, request: dict , resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> bool: @@ -374,7 +374,7 @@ class FigshareHelper(DatasetPlatformHelper):          return False -    def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem: +    def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:          """          Fetch platform-specific metadata for this request (eg, via API calls)          """ @@ -390,7 +390,7 @@ class FigshareHelper(DatasetPlatformHelper):          (platform_id, dataset_version) = self.parse_figshare_url_path(components.path)          assert platform_id.isdigit(), f"expected numeric: {platform_id}" -        assert dataset_version.isdigit(), f"expected numeric: {dataset_version}" +        assert dataset_version and dataset_version.isdigit(), f"expected numeric: {dataset_version}"          # 1b. if we didn't get a version number from URL, fetch it from API          # TODO: implement this code path @@ -436,7 +436,7 @@ class FigshareHelper(DatasetPlatformHelper):              version=obj['version'],          ) -        return DatasetPlatformItem( +        return FilesetPlatformItem(              platform_name=self.platform_name,              platform_status='success',              manifest=manifest, @@ -471,7 +471,7 @@ def test_parse_figshare_url_path():          except ValueError:              pass -class ZenodoHelper(DatasetPlatformHelper): +class ZenodoHelper(FilesetPlatformHelper):      def __init__(self):          self.platform_name = 'zenodo' @@ -490,7 +490,7 @@ class ZenodoHelper(DatasetPlatformHelper):              return True          return False -    def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem: +    def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:          """          Fetch platform-specific metadata for this request (eg, via API calls)          """ @@ -567,7 +567,7 @@ class ZenodoHelper(DatasetPlatformHelper):              # obj['metadata']['version'] is, eg, git version tag          ) -        return DatasetPlatformItem( +        return FilesetPlatformItem(              platform_name=self.platform_name,              platform_status='success',              manifest=manifest, @@ -581,7 +581,7 @@ class ZenodoHelper(DatasetPlatformHelper):          ) -class ArchiveOrgHelper(DatasetPlatformHelper): +class ArchiveOrgHelper(FilesetPlatformHelper):      FORMAT_TO_MIMETYPE = {          'BZIP': 'application/x-bzip', @@ -623,7 +623,7 @@ class ArchiveOrgHelper(DatasetPlatformHelper):          self.session = internetarchive.get_session()      @staticmethod -    def want_item_file(f: dict, item_name: str) -> bool: +    def want_item_file(f: internetarchive.File, item_name: str) -> bool:          """          Filters IA API files          """ @@ -662,7 +662,7 @@ class ArchiveOrgHelper(DatasetPlatformHelper):                  return True          return False -    def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> DatasetPlatformItem: +    def process_request(self, request: dict, resource: Optional[ResourceResult], html_biblio: Optional[BiblioMetadata]) -> FilesetPlatformItem:          """          Fetch platform-specific metadata for this request (eg, via API calls)          """ @@ -700,7 +700,7 @@ class ArchiveOrgHelper(DatasetPlatformHelper):              )              manifest.append(mf) -        return DatasetPlatformItem( +        return FilesetPlatformItem(              platform_name=self.platform_name,              platform_status='success',              manifest=manifest, @@ -710,10 +710,11 @@ class ArchiveOrgHelper(DatasetPlatformHelper):              archiveorg_meta=dict(collection=item_collection),          ) -    def chose_strategy(self, item: DatasetPlatformItem) -> IngestStrategy: +    def chose_strategy(self, item: FilesetPlatformItem) -> IngestStrategy:          """          Don't use default strategy picker; we are always doing an 'existing' in this case.          """ +        assert item.manifest is not None          if len(item.manifest) == 1:              # NOTE: code flow does not support ArchiveorgFilesetBundle for the              # case of, eg, a single zipfile in an archive.org item diff --git a/python/sandcrawler/fileset_strategies.py b/python/sandcrawler/fileset_strategies.py index 43f1a53..2577d2b 100644 --- a/python/sandcrawler/fileset_strategies.py +++ b/python/sandcrawler/fileset_strategies.py @@ -12,7 +12,7 @@ import internetarchive  from sandcrawler.html_metadata import BiblioMetadata  from sandcrawler.ia import ResourceResult, WaybackClient, SavePageNowClient, fix_transfer_encoding -from sandcrawler.fileset_types import IngestStrategy, FilesetManifestFile, DatasetPlatformItem, ArchiveStrategyResult, PlatformScopeError +from sandcrawler.fileset_types import IngestStrategy, FilesetManifestFile, FilesetPlatformItem, ArchiveStrategyResult, PlatformScopeError  from sandcrawler.misc import gen_file_metadata, gen_file_metadata_path @@ -22,10 +22,10 @@ class FilesetIngestStrategy():          #self.ingest_strategy = 'unknown'          pass -    def check_existing(self, item: DatasetPlatformItem) -> Optional[ArchiveStrategyResult]: +    def check_existing(self, item: FilesetPlatformItem) -> Optional[ArchiveStrategyResult]:          raise NotImplementedError() -    def process(self, item: DatasetPlatformItem) -> ArchiveStrategyResult: +    def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult:          raise NotImplementedError() @@ -44,7 +44,7 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):          self.ia_session = internetarchive.get_session() -    def check_existing(self, item: DatasetPlatformItem) -> Optional[ArchiveStrategyResult]: +    def check_existing(self, item: FilesetPlatformItem) -> Optional[ArchiveStrategyResult]:          """          use API to check for item with all the files in the manifest @@ -52,8 +52,9 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):          """          ia_item = self.ia_session.get_item(item.archiveorg_item_name)          if not ia_item.exists: -            return False +            return None          item_files = ia_item.get_files(on_the_fly=False) +        assert item.manifest          for wanted in item.manifest:              found = False              for existing in item_files: @@ -74,7 +75,7 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):              manifest=item.manifest,          ) -    def process(self, item: DatasetPlatformItem) -> ArchiveStrategyResult: +    def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult:          """          May require extra context to pass along to archive.org item creation.          """ @@ -94,6 +95,7 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):              pass          # 1. download all files locally +        assert item.manifest          for m in item.manifest:              # XXX: enforce safe/sane filename @@ -143,7 +145,7 @@ class ArchiveorgFilesetStrategy(FilesetIngestStrategy):              m.status = 'verified-local'          # 2. upload all files, with metadata -        assert item.archiveorg_item_meta['collection'] +        assert item.archiveorg_item_meta and item.archiveorg_item_meta['collection']          item_files = []          for m in item.manifest:              local_path = local_dir + '/' + m.path @@ -212,7 +214,7 @@ class WebFilesetStrategy(FilesetIngestStrategy):              "://s3-eu-west-1.amazonaws.com/",          ] -    def process(self, item: DatasetPlatformItem) -> ArchiveStrategyResult: +    def process(self, item: FilesetPlatformItem) -> ArchiveStrategyResult:          """          For each manifest item individually, run 'fetch_resource' and record stats, terminal_url, terminal_dt @@ -220,6 +222,7 @@ class WebFilesetStrategy(FilesetIngestStrategy):          - full fetch_resource() method which can do SPN requests          """ +        assert item.manifest          for m in item.manifest:              fetch_url = m.platform_url              if not fetch_url: @@ -241,7 +244,7 @@ class WebFilesetStrategy(FilesetIngestStrategy):              print("[FETCH {:>6}] {}  {}".format(                      via,                      (resource and resource.status), -                    (resource and resource.terminal_url) or url), +                    (resource and resource.terminal_url) or fetch_url),                  file=sys.stderr)              m.terminal_url = resource.terminal_url @@ -268,7 +271,7 @@ class WebFilesetStrategy(FilesetIngestStrategy):          overall_status = "success"          for m in item.manifest:              if m.status != 'success': -                overall_status = m.status +                overall_status = m.status or 'not-processed'                  break          if not item.manifest:              overall_status = 'empty-manifest' diff --git a/python/sandcrawler/fileset_types.py b/python/sandcrawler/fileset_types.py index 9fe8b0d..037843e 100644 --- a/python/sandcrawler/fileset_types.py +++ b/python/sandcrawler/fileset_types.py @@ -19,20 +19,20 @@ class FilesetManifestFile(BaseModel):      sha1: Optional[str]      sha256: Optional[str]      mimetype: Optional[str] +    extra: Optional[Dict[str, Any]]      status: Optional[str]      platform_url: Optional[str]      terminal_url: Optional[str]      terminal_dt: Optional[str] -    extra: Optional[Dict[str, Any]] -class DatasetPlatformItem(BaseModel): +class FilesetPlatformItem(BaseModel):      platform_name: str      platform_status: str -    manifest: Optional[List[FilesetManifestFile]] -      platform_domain: Optional[str]      platform_id: Optional[str] +    manifest: Optional[List[FilesetManifestFile]] +      archiveorg_item_name: Optional[str]      archiveorg_item_meta: Optional[dict]      web_base_url: Optional[str] @@ -42,6 +42,13 @@ class ArchiveStrategyResult(BaseModel):      ingest_strategy: str      status: str      manifest: List[FilesetManifestFile] +    file_file_meta: Optional[Dict[str, Any]] +    file_terminal: Optional[Dict[str, Any]] +    file_cdx: Optional[Dict[str, Any]] +    bundle_file_meta: Optional[Dict[str, Any]] +    bundle_terminal: Optional[Any] +    bundle_cdx: Optional[Any] +    bundle_archiveorg_path: Optional[str]  class PlatformScopeError(Exception):      """ diff --git a/python/sandcrawler/ingest_fileset.py b/python/sandcrawler/ingest_fileset.py index ce6cdca..de392b2 100644 --- a/python/sandcrawler/ingest_fileset.py +++ b/python/sandcrawler/ingest_fileset.py @@ -20,7 +20,7 @@ from sandcrawler.html_metadata import BiblioMetadata, html_extract_resources, ht  from sandcrawler.workers import SandcrawlerWorker  from sandcrawler.db import SandcrawlerPostgrestClient  from sandcrawler.ingest_file import IngestFileWorker -from sandcrawler.fileset_platforms import DatasetPlatformHelper, DATASET_PLATFORM_HELPER_TABLE +from sandcrawler.fileset_platforms import FilesetPlatformHelper, DATASET_PLATFORM_HELPER_TABLE  from sandcrawler.fileset_strategies import FilesetIngestStrategy, FILESET_STRATEGY_HELPER_TABLE  from sandcrawler.fileset_types import PlatformScopeError, PlatformRestrictedError @@ -47,8 +47,8 @@ class IngestFilesetWorker(IngestFileWorker):          self.sink = sink          self.dataset_platform_helpers = DATASET_PLATFORM_HELPER_TABLE          self.dataset_strategy_archivers = FILESET_STRATEGY_HELPER_TABLE -        self.max_total_size = 100*1024*1024*1024 -        self.max_file_count = 500 +        self.max_total_size = kwargs.get('max_total_size', 64*1024*1024*1024) +        self.max_file_count = kwargs.get('max_file_count', 200)      def check_existing_ingest(self, ingest_type: str, base_url: str) -> Optional[dict]: @@ -79,34 +79,15 @@ class IngestFilesetWorker(IngestFileWorker):              return False          return True -    def process(self, request: dict, key: Any = None) -> dict: - -        ingest_type = request.get('ingest_type') -        if ingest_type not in ("dataset",): -            raise NotImplementedError(f"can't handle ingest_type={ingest_type}") - -        # parse/clean URL -        # note that we pass through the original/raw URL, and that is what gets -        # persisted in database table -        base_url = clean_url(request['base_url']) - -        force_recrawl = bool(request.get('force_recrawl', False)) - -        print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) - -        # TODO: "existing" check for new fileset ingest result table -        #existing = self.check_existing_ingest(ingest_type, base_url) -        #if existing: -        #    return self.process_existing(request, existing) +    def fetch_resource_iteratively(self, ingest_type: str, base_url: str, force_recrawl: bool) -> dict: +        """ +        This is copypasta from process_file(), should probably refactor. +        """ -        result: Dict[str, Any] = dict(request=request, hit=False) +        result: Dict[str, Any] = dict(hit=False)          result['hops'] = [base_url]          next_url = base_url -        # 1. Determine `platform`, which may involve resolving redirects and crawling a landing page. - -        ### START COPYPASTA from process_file(), should refactor ### -          # check against blocklist          for block in self.base_url_blocklist:              # XXX: hack to not skip archive.org content @@ -247,9 +228,42 @@ class IngestFilesetWorker(IngestFileWorker):                  #raise NotImplementedError()                  pass -        ### END COPYPASTA ### +        result['_html_biblio'] = html_biblio +        result['_resource'] = resource +        return result + + +    def process(self, request: dict, key: Any = None) -> dict: + +        ingest_type = request.get('ingest_type') +        if ingest_type not in ("dataset",): +            raise NotImplementedError(f"can't handle ingest_type={ingest_type}") + +        # parse/clean URL +        # note that we pass through the original/raw URL, and that is what gets +        # persisted in database table +        base_url = clean_url(request['base_url']) + +        force_recrawl = bool(request.get('force_recrawl', False)) + +        print("[INGEST {:>6}] {}".format(ingest_type, base_url), file=sys.stderr) + +        # TODO: "existing" check against file and/or fileset ingest result table +        #existing = self.check_existing_ingest(ingest_type, base_url) +        #if existing: +        #    return self.process_existing(request, existing) + +        result = self.fetch_resource_iteratively(ingest_type, base_url, force_recrawl=force_recrawl) +        if result.get('status') != None: +            result['request'] = request +            return result + +        html_biblio = result.pop('_html_biblio') +        resource = result.pop('_resource') + +        # 1. Determine `platform`, which may involve resolving redirects and crawling a landing page. -        # XXX: html_guess_platform() +        # TODO: could involve html_guess_platform() here?          # determine platform          platform_helper = None @@ -291,10 +305,10 @@ class IngestFilesetWorker(IngestFileWorker):          #print(dataset_meta, file=sys.stderr)          platform = dataset_meta.platform_name -        result['platform'] = dataset_meta.platform_name +        result['platform_name'] = dataset_meta.platform_name +        result['platform_domain'] = dataset_meta.platform_domain          result['platform_id'] = dataset_meta.platform_id -        result['item_name'] = dataset_meta.archiveorg_item_name -        result['item_meta'] = dataset_meta.archiveorg_item_meta +        result['archiveorg_item_name'] = dataset_meta.archiveorg_item_name          if not dataset_meta.manifest:              result['status'] = 'empty-manifest' @@ -309,6 +323,9 @@ class IngestFilesetWorker(IngestFileWorker):              result['status'] = 'too-large-size'              return result          if result['file_count'] > self.max_file_count: +            # hard max, to prevent downstream breakage +            if result['file_count'] > 10*1000: +                result['manifest'] = result['manifest'][:self.max_file_count]              result['status'] = 'too-many-files'              return result @@ -327,8 +344,46 @@ class IngestFilesetWorker(IngestFileWorker):          # 4. Summarize status and return structured result metadata.          result['status'] = archive_result.status          result['manifest'] = [m.dict() for m in archive_result.manifest] -        result['file_count'] = len(archive_result.manifest) or None -        result['total_size'] = sum([m.size for m in archive_result.manifest if m.size]) or None + +        if ingest_strategy.endswith('-fileset-bundle'): +            result['fileset_bundle'] = dict( +                file_meta=archive_result.bundle_file_meta, +                archiveorg_bundle_path=archive_result.archiveorg_bundle_path, +            ) +            if archive_result.bundle_terminal: +                result['fileset_bundle']['terminal'] = dict( +                    terminal_url=archive_result.bundle_terminal.terminal_url, +                    terminal_dt=archive_result.bundle_terminal.terminal_dt, +                    terminal_status_code=archive_result.bundle_terminal.terminal_status_code, +                ) +            if archive_result.bundle_cdx: +                result['fileset_bundle']['cdx'] = cdx_to_dict(archive_result.bundle_cdx) +                if archive_result.bundle_cdx.revisit_cdx: +                    result['fileset_bundle']['revisit_cdx'] = cdx_to_dict(archive_result.bundle_cdx.revisit_cdx) +        if ingest_strategy.endswith('-file'): +            result['fileset_file'] = dict( +                file_meta=archive_result.file_file_meta, +            ) +            if archive_result.file_terminal: +                result['fileset_file']['terminal'] = dict( +                    terminal_url=archive_result.file_terminal.terminal_url, +                    terminal_dt=archive_result.file_terminal.terminal_dt, +                    terminal_status_code=archive_result.file_terminal.terminal_status_code, +                ) +            if archive_result.file_cdx: +                result['fileset_file']['cdx'] = cdx_to_dict(archive_result.file_cdx) +                if archive_result.file_cdx.revisit_cdx: +                    result['fileset_file']['revisit_cdx'] = cdx_to_dict(archive_result.file_cdx.revisit_cdx) + +        if result['status'].startswith('success'): +            # check that these are still valid +            assert result['file_count'] == len(archive_result.manifest) +            assert result['total_size'] == sum([m.size for m in archive_result.manifest if m.size]) + + +        # XXX: here is where we would publish to ingest file result topic... or call process_hit()? +        if result['status'] == 'success-file': +            pass          if result['status'].startswith('success'):              result['hit'] = True  | 
