diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2022-04-07 14:44:01 -0700 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2022-04-07 14:44:01 -0700 | 
| commit | ede98644a89afd15d903061e0998dbd08851df6d (patch) | |
| tree | 17c54c5764adb2f5d67aa750174f635e0fb1cdc8 /python/fatcat_tools | |
| parent | 2ef72e0c769e94401568ab42def30ddb5268fa98 (diff) | |
| parent | 0aaa2a839d7a14716ee1a84b730203a7953dc5e0 (diff) | |
| download | fatcat-ede98644a89afd15d903061e0998dbd08851df6d.tar.gz fatcat-ede98644a89afd15d903061e0998dbd08851df6d.zip | |
Merge branch 'bnewbold-dataset-ingest-fixes'
Diffstat (limited to 'python/fatcat_tools')
| -rw-r--r-- | python/fatcat_tools/importers/__init__.py | 1 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/common.py | 11 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/ingest.py | 273 | 
3 files changed, 250 insertions, 35 deletions
| diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 654be2e9..e13ab552 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -37,6 +37,7 @@ from .fileset_generic import FilesetImporter  from .grobid_metadata import GrobidMetadataImporter  from .ingest import (      IngestFileResultImporter, +    IngestFilesetFileResultImporter,      IngestFilesetResultImporter,      IngestWebResultImporter,      SavePaperNowFileImporter, diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index cd51a24c..2136d1da 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -916,3 +916,14 @@ def make_kafka_consumer(      )      print("Consuming from kafka topic {}, group {}".format(topic_name, group))      return consumer + + +def filesets_very_similar(a: FilesetEntity, b: FilesetEntity) -> bool: +    """ +    This helper method checks if two Fileset entities are effectively equivalent: same set of files with comparable hashes. + +    Uses a set() of SHA1 hashes to test for equivalence. +    """ +    a_hashes = set([f.sha1 for f in a.manifest]) +    b_hashes = set([f.sha1 for f in b.manifest]) +    return a_hashes == b_hashes diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py index 4f1cc3c4..c1fed31f 100644 --- a/python/fatcat_tools/importers/ingest.py +++ b/python/fatcat_tools/importers/ingest.py @@ -11,7 +11,7 @@ from fatcat_openapi_client import (      WebcaptureEntity,  ) -from .common import EntityImporter, make_rel_url +from .common import EntityImporter, filesets_very_similar, make_rel_url  class IngestFileResultImporter(EntityImporter): @@ -260,6 +260,16 @@ class IngestFileResultImporter(EntityImporter):                  edit_extra["grobid_status_code"] = row["grobid"]["status_code"]                  edit_extra["grobid_version"] = row["grobid"].get("grobid_version") +        # fileset/platform metadata +        if row.get("ingest_strategy"): +            edit_extra["ingest_strategy"] = row["ingest_strategy"] +        if row.get("platform_domain"): +            edit_extra["platform_domain"] = row["platform_domain"] +        if row.get("platform_name"): +            edit_extra["platform_name"] = row["platform_name"] +        if row.get("platform_id"): +            edit_extra["platform_id"] = row["platform_id"] +          return edit_extra      def parse_record(self, row: Dict[str, Any]) -> FileEntity: @@ -518,7 +528,6 @@ class IngestWebResultImporter(IngestFileResultImporter):          )          edit_extra = self.parse_edit_extra(row) -          if edit_extra:              wc.edit_extra = edit_extra          return wc @@ -675,9 +684,9 @@ class IngestFilesetResultImporter(IngestFileResultImporter):          return True      def parse_fileset_urls(self, row: Dict[str, Any]) -> List[FilesetUrl]: -        if not row.get("strategy"): +        if not row.get("ingest_strategy"):              return [] -        strategy = row["strategy"] +        strategy = row["ingest_strategy"]          urls = []          if strategy == "archiveorg-fileset" and row.get("archiveorg_item_name"):              urls.append( @@ -686,17 +695,14 @@ class IngestFilesetResultImporter(IngestFileResultImporter):                      rel="archive-base",                  )              ) -        if row["strategy"].startswith("web-") and row.get("platform_base_url"): +        if strategy.startswith("web-") and row.get("platform_base_url"):              urls.append(                  fatcat_openapi_client.FilesetUrl(                      url=f"https://web.archive.org/web/{row['web_base_url_dt']}/{row['web_base_url']}",                      rel="webarchive-base",                  )              ) -        # TODO: repository-base -        # TODO: web-base - -        if row["strategy"] == "archiveorg-fileset-bundle" and row.get("archiveorg_item_name"): +        if strategy == "archiveorg-fileset-bundle" and row.get("archiveorg_item_name"):              urls.append(                  fatcat_openapi_client.FilesetUrl(                      url=f"https://archive.org/download/{row['archiveorg_item_name']}/{row['archiveorg_bundle_path']}", @@ -704,7 +710,7 @@ class IngestFilesetResultImporter(IngestFileResultImporter):                  )              ) -        if row["strategy"] == "web-fileset-bundle" and row.get("platform_bundle_url"): +        if strategy == "web-fileset-bundle" and row.get("platform_bundle_url"):              urls.append(                  fatcat_openapi_client.FilesetUrl(                      url=f"https://web.archive.org/web/{row['web_bundle_url_dt']}/{row['web_bundle_url']}", @@ -727,6 +733,15 @@ class IngestFilesetResultImporter(IngestFileResultImporter):                      rel="repository-base",                  )              ) +        elif row.get("terminal"): +            # fallback generic web URL +            urls.append( +                fatcat_openapi_client.FilesetUrl( +                    url=row["terminal"]["terminal_url"], +                    rel="web", +                ) +            ) +          return urls      def parse_record(self, row: Dict[str, Any]) -> FilesetEntity: @@ -748,12 +763,6 @@ class IngestFilesetResultImporter(IngestFileResultImporter):              return None          entity_extra: Dict[str, Any] = dict() -        edit_extra = self.parse_edit_extra(row) -        edit_extra["ingest_strategy"] = row["ingest_strategy"] -        if row.get("platform"): -            edit_extra["platform"] = row["platform"] -        if row.get("platform_id"): -            edit_extra["platform_id"] = row["platform_id"]          entity_urls = self.parse_fileset_urls(row)          if not entity_urls: @@ -770,33 +779,33 @@ class IngestFilesetResultImporter(IngestFileResultImporter):              fsf = fatcat_openapi_client.FilesetFile(                  path=ingest_file["path"],                  size=ingest_file["size"], -                md5=ingest_file["md5"], -                sha1=ingest_file["sha1"], +                md5=ingest_file.get("md5"), +                sha1=ingest_file.get("sha1"),                  sha256=ingest_file.get("sha256"), -                extra=dict( -                    mimetype=ingest_file["mimetype"], -                ), +                mimetype=ingest_file.get("mimetype"), +                extra=dict(),              ) -            if not (fsf.md5 and fsf.sha1 and fsf.path and fsf.size): +            if not (fsf.md5 and fsf.sha1 and fsf.path and fsf.size and fsf.mimetype):                  self.counts["skip-partial-file-info"] += 1                  return None              if ingest_file.get("platform_url"): -                # XXX: should we include this?                  fsf.extra["original_url"] = ingest_file["platform_url"]              if ingest_file.get("terminal_url") and ingest_file.get("terminal_dt"):                  fsf.extra[                      "wayback_url"                  ] = f"https://web.archive.org/web/{ingest_file['terminal_dt']}/{ingest_file['terminal_url']}" +            if not fsf.extra: +                fsf.extra = None              manifest.append(fsf)          fe = fatcat_openapi_client.FilesetEntity(              manifest=manifest,              urls=entity_urls,              release_ids=[release_ident], +            extra=entity_extra or None,          ) -        if entity_extra: -            fe.extra = entity_extra +        edit_extra = self.parse_edit_extra(row)          if edit_extra:              fe.edit_extra = edit_extra          return fe @@ -805,26 +814,29 @@ class IngestFilesetResultImporter(IngestFileResultImporter):          # check for existing edits-in-progress with same URL          for other in self._entity_queue: -            # XXX: how to duplicate check? -            if other.original_url == fse.original_url: +            if filesets_very_similar(other, fse):                  self.counts["skip-in-queue"] += 1 +                self.counts["skip"] += 1                  return False          # lookup sha1, or create new entity (TODO: API doesn't support this yet)          # existing = None          # NOTE: in lieu of existing checks (by lookup), only allow one fileset per release -        release = self.api.get_release(fse.release_ids[0], expand="filesets") -        if release.filesets: -            # XXX: how to duplicate check filesets? +        if not self.bezerk_mode: +            release = self.api.get_release(fse.release_ids[0], expand="filesets") +              # check if this is an existing match, or just a similar hit -            for other in release.filesets: -                if fse.original_url == other.original_url: -                    # TODO: compare very similar timestamps of same time (different formats) +            for other in release.filesets or []: +                if filesets_very_similar(other, fse):                      self.counts["exists"] += 1                      return False -            self.counts["skip-release-has-fileset"] += 1 -            return False + +            # for now, being conservative and just skipping if release has any other fileset +            if release.filesets: +                self.counts["skip-release-has-fileset"] += 1 +                self.counts["skip"] += 1 +                return False          return True @@ -849,6 +861,197 @@ class IngestFilesetResultImporter(IngestFileResultImporter):              ) +class IngestFilesetFileResultImporter(IngestFileResultImporter): +    """ +    Variant of IngestFileResultImporter for processing dataset (Fileset) ingest +    results, which resulted in a single file, into File entities. +    """ + +    def __init__(self, api: ApiClient, **kwargs) -> None: + +        eg_desc = ( +            kwargs.pop("editgroup_description", None) +            or "Single files crawled from web using sandcrawler ingest tool, in dataset mode" +        ) +        eg_extra = kwargs.pop("editgroup_extra", dict()) +        eg_extra["agent"] = eg_extra.get( +            "agent", "fatcat_tools.IngestFilesetFileResultImporter" +        ) +        kwargs["do_updates"] = False +        super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs) +        self.max_file_count = 300 + +    def want_fileset(self, row: Dict[str, Any]) -> bool: + +        manifest: Optional[List[Any]] = row.get("manifest") +        if not manifest or len(manifest) == 0: +            self.counts["skip-empty-manifest"] += 1 +            return False + +        if len(manifest) > 1: +            self.counts["skip-multiple-files"] += 1 +            return False + +        assert len(manifest) == 1 +        return True + +    def want(self, row: Dict[str, Any]) -> bool: + +        if not self.want_ingest(row): +            return False + +        if row.get("status") != "success-file": +            self.counts["skip-status"] += 1 +            return False + +        # fileset-specific filters +        if row["request"].get("ingest_type") not in [ +            "dataset", +        ]: +            self.counts["skip-ingest-type"] += 1 +            return False + +        if not self.want_fileset(row): +            return False + +        return True + +    def parse_fileset_urls(self, row: Dict[str, Any]) -> List[FilesetUrl]: +        if not row.get("ingest_strategy"): +            return [] +        strategy = row["ingest_strategy"] +        urls = [] +        # XXX +        if strategy == "archiveorg-fileset" and row.get("archiveorg_item_name"): +            urls.append( +                fatcat_openapi_client.FilesetUrl( +                    url=f"https://archive.org/download/{row['archiveorg_item_name']}/", +                    rel="archive-base", +                ) +            ) +        if strategy.startswith("web-") and row.get("platform_base_url"): +            urls.append( +                fatcat_openapi_client.FilesetUrl( +                    url=f"https://web.archive.org/web/{row['web_base_url_dt']}/{row['web_base_url']}", +                    rel="webarchive-base", +                ) +            ) +        if strategy == "archiveorg-fileset-bundle" and row.get("archiveorg_item_name"): +            urls.append( +                fatcat_openapi_client.FilesetUrl( +                    url=f"https://archive.org/download/{row['archiveorg_item_name']}/{row['archiveorg_bundle_path']}", +                    rel="archive-bundle", +                ) +            ) + +        if strategy == "web-fileset-bundle" and row.get("platform_bundle_url"): +            urls.append( +                fatcat_openapi_client.FilesetUrl( +                    url=f"https://web.archive.org/web/{row['web_bundle_url_dt']}/{row['web_bundle_url']}", +                    rel="webarchive-bundle", +                ) +            ) + +        # add any additional / platform URLs here +        if row.get("platform_bundle_url"): +            urls.append( +                fatcat_openapi_client.FilesetUrl( +                    url=row["platform_bundle_url"], +                    rel="repository-bundle", +                ) +            ) +        if row.get("platform_base_url"): +            urls.append( +                fatcat_openapi_client.FilesetUrl( +                    url=row["platform_bundle_url"], +                    rel="repository-base", +                ) +            ) +        elif row.get("terminal"): +            # fallback generic web URL +            urls.append( +                fatcat_openapi_client.FilesetUrl( +                    url=row["terminal"]["terminal_url"], +                    rel="web", +                ) +            ) + +        return urls + +    def parse_record(self, row: Dict[str, Any]) -> FileEntity: + +        request = row["request"] + +        # double check that want() filtered request correctly +        if request.get("ingest_type") not in [ +            "dataset", +        ]: +            self.counts["skip-ingest-type"] += 1 +            return None + +        # identify release by fatcat ident, or extid lookup +        release_ident = self.parse_ingest_release_ident(row) + +        if not release_ident: +            self.counts["skip-release-not-found"] += 1 +            return None + +        assert row["file_count"] == len(row["manifest"]) == 1 +        file_meta = row["manifest"][0] +        # print(file_meta) +        assert file_meta["status"] == "success" + +        # add file-level access URLs +        entity_urls = [] +        if file_meta.get("platform_url"): +            entity_urls.append(FileUrl(rel="web", url=file_meta["platform_url"])) +        if file_meta.get("terminal_url") and file_meta.get("terminal_dt"): +            entity_urls.append( +                FileUrl( +                    rel="webarchive", +                    url=f"https://web.archive.org/web/{file_meta['terminal_dt']}/{file_meta['terminal_url']}", +                ) +            ) +        if row["ingest_strategy"] == "archiveorg-file": +            entity_urls.append( +                FileUrl( +                    rel="archive", +                    url=f"https://archive.org/download/{row['archiveorg_item_name']}/{file_meta['path']}", +                ) +            ) + +        if not entity_urls: +            self.counts["skip-no-access-url"] += 1 +            return None + +        entity_extra: Dict[str, Any] = dict() +        entity_extra["path"] = file_meta["path"] + +        # this is to work around a bug in old sandcrawler ingest code +        if file_meta["md5"] == file_meta["sha1"]: +            self.counts["skip-bad-hashes"] += 1 +            return None + +        fe = FileEntity( +            md5=file_meta["md5"], +            sha1=file_meta["sha1"], +            sha256=file_meta["sha256"], +            size=file_meta["size"], +            mimetype=file_meta["mimetype"], +            release_ids=[release_ident], +            urls=entity_urls, +            extra=entity_extra or None, +        ) +        if not (fe.md5 and fe.sha1 and fe.sha256 and (fe.size is not None) and fe.mimetype): +            self.counts["skip-partial-file-info"] += 1 +            return None + +        edit_extra = self.parse_edit_extra(row) +        if edit_extra: +            fe.edit_extra = edit_extra +        return fe + +  class SavePaperNowFilesetImporter(IngestFilesetResultImporter):      """      Like SavePaperNowFileImporter, but for fileset/dataset ingest. | 
