diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2022-04-07 14:44:01 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2022-04-07 14:44:01 -0700 |
commit | ede98644a89afd15d903061e0998dbd08851df6d (patch) | |
tree | 17c54c5764adb2f5d67aa750174f635e0fb1cdc8 /python/fatcat_tools | |
parent | 2ef72e0c769e94401568ab42def30ddb5268fa98 (diff) | |
parent | 0aaa2a839d7a14716ee1a84b730203a7953dc5e0 (diff) | |
download | fatcat-ede98644a89afd15d903061e0998dbd08851df6d.tar.gz fatcat-ede98644a89afd15d903061e0998dbd08851df6d.zip |
Merge branch 'bnewbold-dataset-ingest-fixes'
Diffstat (limited to 'python/fatcat_tools')
-rw-r--r-- | python/fatcat_tools/importers/__init__.py | 1 | ||||
-rw-r--r-- | python/fatcat_tools/importers/common.py | 11 | ||||
-rw-r--r-- | python/fatcat_tools/importers/ingest.py | 273 |
3 files changed, 250 insertions, 35 deletions
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 654be2e9..e13ab552 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -37,6 +37,7 @@ from .fileset_generic import FilesetImporter from .grobid_metadata import GrobidMetadataImporter from .ingest import ( IngestFileResultImporter, + IngestFilesetFileResultImporter, IngestFilesetResultImporter, IngestWebResultImporter, SavePaperNowFileImporter, diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index cd51a24c..2136d1da 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -916,3 +916,14 @@ def make_kafka_consumer( ) print("Consuming from kafka topic {}, group {}".format(topic_name, group)) return consumer + + +def filesets_very_similar(a: FilesetEntity, b: FilesetEntity) -> bool: + """ + This helper method checks if two Fileset entities are effectively equivalent: same set of files with comparable hashes. + + Uses a set() of SHA1 hashes to test for equivalence. + """ + a_hashes = set([f.sha1 for f in a.manifest]) + b_hashes = set([f.sha1 for f in b.manifest]) + return a_hashes == b_hashes diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py index 4f1cc3c4..c1fed31f 100644 --- a/python/fatcat_tools/importers/ingest.py +++ b/python/fatcat_tools/importers/ingest.py @@ -11,7 +11,7 @@ from fatcat_openapi_client import ( WebcaptureEntity, ) -from .common import EntityImporter, make_rel_url +from .common import EntityImporter, filesets_very_similar, make_rel_url class IngestFileResultImporter(EntityImporter): @@ -260,6 +260,16 @@ class IngestFileResultImporter(EntityImporter): edit_extra["grobid_status_code"] = row["grobid"]["status_code"] edit_extra["grobid_version"] = row["grobid"].get("grobid_version") + # fileset/platform metadata + if row.get("ingest_strategy"): + edit_extra["ingest_strategy"] = row["ingest_strategy"] + if row.get("platform_domain"): + edit_extra["platform_domain"] = row["platform_domain"] + if row.get("platform_name"): + edit_extra["platform_name"] = row["platform_name"] + if row.get("platform_id"): + edit_extra["platform_id"] = row["platform_id"] + return edit_extra def parse_record(self, row: Dict[str, Any]) -> FileEntity: @@ -518,7 +528,6 @@ class IngestWebResultImporter(IngestFileResultImporter): ) edit_extra = self.parse_edit_extra(row) - if edit_extra: wc.edit_extra = edit_extra return wc @@ -675,9 +684,9 @@ class IngestFilesetResultImporter(IngestFileResultImporter): return True def parse_fileset_urls(self, row: Dict[str, Any]) -> List[FilesetUrl]: - if not row.get("strategy"): + if not row.get("ingest_strategy"): return [] - strategy = row["strategy"] + strategy = row["ingest_strategy"] urls = [] if strategy == "archiveorg-fileset" and row.get("archiveorg_item_name"): urls.append( @@ -686,17 +695,14 @@ class IngestFilesetResultImporter(IngestFileResultImporter): rel="archive-base", ) ) - if row["strategy"].startswith("web-") and row.get("platform_base_url"): + if strategy.startswith("web-") and row.get("platform_base_url"): urls.append( fatcat_openapi_client.FilesetUrl( url=f"https://web.archive.org/web/{row['web_base_url_dt']}/{row['web_base_url']}", rel="webarchive-base", ) ) - # TODO: repository-base - # TODO: web-base - - if row["strategy"] == "archiveorg-fileset-bundle" and row.get("archiveorg_item_name"): + if strategy == "archiveorg-fileset-bundle" and row.get("archiveorg_item_name"): urls.append( fatcat_openapi_client.FilesetUrl( url=f"https://archive.org/download/{row['archiveorg_item_name']}/{row['archiveorg_bundle_path']}", @@ -704,7 +710,7 @@ class IngestFilesetResultImporter(IngestFileResultImporter): ) ) - if row["strategy"] == "web-fileset-bundle" and row.get("platform_bundle_url"): + if strategy == "web-fileset-bundle" and row.get("platform_bundle_url"): urls.append( fatcat_openapi_client.FilesetUrl( url=f"https://web.archive.org/web/{row['web_bundle_url_dt']}/{row['web_bundle_url']}", @@ -727,6 +733,15 @@ class IngestFilesetResultImporter(IngestFileResultImporter): rel="repository-base", ) ) + elif row.get("terminal"): + # fallback generic web URL + urls.append( + fatcat_openapi_client.FilesetUrl( + url=row["terminal"]["terminal_url"], + rel="web", + ) + ) + return urls def parse_record(self, row: Dict[str, Any]) -> FilesetEntity: @@ -748,12 +763,6 @@ class IngestFilesetResultImporter(IngestFileResultImporter): return None entity_extra: Dict[str, Any] = dict() - edit_extra = self.parse_edit_extra(row) - edit_extra["ingest_strategy"] = row["ingest_strategy"] - if row.get("platform"): - edit_extra["platform"] = row["platform"] - if row.get("platform_id"): - edit_extra["platform_id"] = row["platform_id"] entity_urls = self.parse_fileset_urls(row) if not entity_urls: @@ -770,33 +779,33 @@ class IngestFilesetResultImporter(IngestFileResultImporter): fsf = fatcat_openapi_client.FilesetFile( path=ingest_file["path"], size=ingest_file["size"], - md5=ingest_file["md5"], - sha1=ingest_file["sha1"], + md5=ingest_file.get("md5"), + sha1=ingest_file.get("sha1"), sha256=ingest_file.get("sha256"), - extra=dict( - mimetype=ingest_file["mimetype"], - ), + mimetype=ingest_file.get("mimetype"), + extra=dict(), ) - if not (fsf.md5 and fsf.sha1 and fsf.path and fsf.size): + if not (fsf.md5 and fsf.sha1 and fsf.path and fsf.size and fsf.mimetype): self.counts["skip-partial-file-info"] += 1 return None if ingest_file.get("platform_url"): - # XXX: should we include this? fsf.extra["original_url"] = ingest_file["platform_url"] if ingest_file.get("terminal_url") and ingest_file.get("terminal_dt"): fsf.extra[ "wayback_url" ] = f"https://web.archive.org/web/{ingest_file['terminal_dt']}/{ingest_file['terminal_url']}" + if not fsf.extra: + fsf.extra = None manifest.append(fsf) fe = fatcat_openapi_client.FilesetEntity( manifest=manifest, urls=entity_urls, release_ids=[release_ident], + extra=entity_extra or None, ) - if entity_extra: - fe.extra = entity_extra + edit_extra = self.parse_edit_extra(row) if edit_extra: fe.edit_extra = edit_extra return fe @@ -805,26 +814,29 @@ class IngestFilesetResultImporter(IngestFileResultImporter): # check for existing edits-in-progress with same URL for other in self._entity_queue: - # XXX: how to duplicate check? - if other.original_url == fse.original_url: + if filesets_very_similar(other, fse): self.counts["skip-in-queue"] += 1 + self.counts["skip"] += 1 return False # lookup sha1, or create new entity (TODO: API doesn't support this yet) # existing = None # NOTE: in lieu of existing checks (by lookup), only allow one fileset per release - release = self.api.get_release(fse.release_ids[0], expand="filesets") - if release.filesets: - # XXX: how to duplicate check filesets? + if not self.bezerk_mode: + release = self.api.get_release(fse.release_ids[0], expand="filesets") + # check if this is an existing match, or just a similar hit - for other in release.filesets: - if fse.original_url == other.original_url: - # TODO: compare very similar timestamps of same time (different formats) + for other in release.filesets or []: + if filesets_very_similar(other, fse): self.counts["exists"] += 1 return False - self.counts["skip-release-has-fileset"] += 1 - return False + + # for now, being conservative and just skipping if release has any other fileset + if release.filesets: + self.counts["skip-release-has-fileset"] += 1 + self.counts["skip"] += 1 + return False return True @@ -849,6 +861,197 @@ class IngestFilesetResultImporter(IngestFileResultImporter): ) +class IngestFilesetFileResultImporter(IngestFileResultImporter): + """ + Variant of IngestFileResultImporter for processing dataset (Fileset) ingest + results, which resulted in a single file, into File entities. + """ + + def __init__(self, api: ApiClient, **kwargs) -> None: + + eg_desc = ( + kwargs.pop("editgroup_description", None) + or "Single files crawled from web using sandcrawler ingest tool, in dataset mode" + ) + eg_extra = kwargs.pop("editgroup_extra", dict()) + eg_extra["agent"] = eg_extra.get( + "agent", "fatcat_tools.IngestFilesetFileResultImporter" + ) + kwargs["do_updates"] = False + super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs) + self.max_file_count = 300 + + def want_fileset(self, row: Dict[str, Any]) -> bool: + + manifest: Optional[List[Any]] = row.get("manifest") + if not manifest or len(manifest) == 0: + self.counts["skip-empty-manifest"] += 1 + return False + + if len(manifest) > 1: + self.counts["skip-multiple-files"] += 1 + return False + + assert len(manifest) == 1 + return True + + def want(self, row: Dict[str, Any]) -> bool: + + if not self.want_ingest(row): + return False + + if row.get("status") != "success-file": + self.counts["skip-status"] += 1 + return False + + # fileset-specific filters + if row["request"].get("ingest_type") not in [ + "dataset", + ]: + self.counts["skip-ingest-type"] += 1 + return False + + if not self.want_fileset(row): + return False + + return True + + def parse_fileset_urls(self, row: Dict[str, Any]) -> List[FilesetUrl]: + if not row.get("ingest_strategy"): + return [] + strategy = row["ingest_strategy"] + urls = [] + # XXX + if strategy == "archiveorg-fileset" and row.get("archiveorg_item_name"): + urls.append( + fatcat_openapi_client.FilesetUrl( + url=f"https://archive.org/download/{row['archiveorg_item_name']}/", + rel="archive-base", + ) + ) + if strategy.startswith("web-") and row.get("platform_base_url"): + urls.append( + fatcat_openapi_client.FilesetUrl( + url=f"https://web.archive.org/web/{row['web_base_url_dt']}/{row['web_base_url']}", + rel="webarchive-base", + ) + ) + if strategy == "archiveorg-fileset-bundle" and row.get("archiveorg_item_name"): + urls.append( + fatcat_openapi_client.FilesetUrl( + url=f"https://archive.org/download/{row['archiveorg_item_name']}/{row['archiveorg_bundle_path']}", + rel="archive-bundle", + ) + ) + + if strategy == "web-fileset-bundle" and row.get("platform_bundle_url"): + urls.append( + fatcat_openapi_client.FilesetUrl( + url=f"https://web.archive.org/web/{row['web_bundle_url_dt']}/{row['web_bundle_url']}", + rel="webarchive-bundle", + ) + ) + + # add any additional / platform URLs here + if row.get("platform_bundle_url"): + urls.append( + fatcat_openapi_client.FilesetUrl( + url=row["platform_bundle_url"], + rel="repository-bundle", + ) + ) + if row.get("platform_base_url"): + urls.append( + fatcat_openapi_client.FilesetUrl( + url=row["platform_bundle_url"], + rel="repository-base", + ) + ) + elif row.get("terminal"): + # fallback generic web URL + urls.append( + fatcat_openapi_client.FilesetUrl( + url=row["terminal"]["terminal_url"], + rel="web", + ) + ) + + return urls + + def parse_record(self, row: Dict[str, Any]) -> FileEntity: + + request = row["request"] + + # double check that want() filtered request correctly + if request.get("ingest_type") not in [ + "dataset", + ]: + self.counts["skip-ingest-type"] += 1 + return None + + # identify release by fatcat ident, or extid lookup + release_ident = self.parse_ingest_release_ident(row) + + if not release_ident: + self.counts["skip-release-not-found"] += 1 + return None + + assert row["file_count"] == len(row["manifest"]) == 1 + file_meta = row["manifest"][0] + # print(file_meta) + assert file_meta["status"] == "success" + + # add file-level access URLs + entity_urls = [] + if file_meta.get("platform_url"): + entity_urls.append(FileUrl(rel="web", url=file_meta["platform_url"])) + if file_meta.get("terminal_url") and file_meta.get("terminal_dt"): + entity_urls.append( + FileUrl( + rel="webarchive", + url=f"https://web.archive.org/web/{file_meta['terminal_dt']}/{file_meta['terminal_url']}", + ) + ) + if row["ingest_strategy"] == "archiveorg-file": + entity_urls.append( + FileUrl( + rel="archive", + url=f"https://archive.org/download/{row['archiveorg_item_name']}/{file_meta['path']}", + ) + ) + + if not entity_urls: + self.counts["skip-no-access-url"] += 1 + return None + + entity_extra: Dict[str, Any] = dict() + entity_extra["path"] = file_meta["path"] + + # this is to work around a bug in old sandcrawler ingest code + if file_meta["md5"] == file_meta["sha1"]: + self.counts["skip-bad-hashes"] += 1 + return None + + fe = FileEntity( + md5=file_meta["md5"], + sha1=file_meta["sha1"], + sha256=file_meta["sha256"], + size=file_meta["size"], + mimetype=file_meta["mimetype"], + release_ids=[release_ident], + urls=entity_urls, + extra=entity_extra or None, + ) + if not (fe.md5 and fe.sha1 and fe.sha256 and (fe.size is not None) and fe.mimetype): + self.counts["skip-partial-file-info"] += 1 + return None + + edit_extra = self.parse_edit_extra(row) + if edit_extra: + fe.edit_extra = edit_extra + return fe + + class SavePaperNowFilesetImporter(IngestFilesetResultImporter): """ Like SavePaperNowFileImporter, but for fileset/dataset ingest. |