aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2022-04-07 14:44:01 -0700
committerBryan Newbold <bnewbold@robocracy.org>2022-04-07 14:44:01 -0700
commitede98644a89afd15d903061e0998dbd08851df6d (patch)
tree17c54c5764adb2f5d67aa750174f635e0fb1cdc8 /python/fatcat_tools
parent2ef72e0c769e94401568ab42def30ddb5268fa98 (diff)
parent0aaa2a839d7a14716ee1a84b730203a7953dc5e0 (diff)
downloadfatcat-ede98644a89afd15d903061e0998dbd08851df6d.tar.gz
fatcat-ede98644a89afd15d903061e0998dbd08851df6d.zip
Merge branch 'bnewbold-dataset-ingest-fixes'
Diffstat (limited to 'python/fatcat_tools')
-rw-r--r--python/fatcat_tools/importers/__init__.py1
-rw-r--r--python/fatcat_tools/importers/common.py11
-rw-r--r--python/fatcat_tools/importers/ingest.py273
3 files changed, 250 insertions, 35 deletions
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py
index 654be2e9..e13ab552 100644
--- a/python/fatcat_tools/importers/__init__.py
+++ b/python/fatcat_tools/importers/__init__.py
@@ -37,6 +37,7 @@ from .fileset_generic import FilesetImporter
from .grobid_metadata import GrobidMetadataImporter
from .ingest import (
IngestFileResultImporter,
+ IngestFilesetFileResultImporter,
IngestFilesetResultImporter,
IngestWebResultImporter,
SavePaperNowFileImporter,
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index cd51a24c..2136d1da 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -916,3 +916,14 @@ def make_kafka_consumer(
)
print("Consuming from kafka topic {}, group {}".format(topic_name, group))
return consumer
+
+
+def filesets_very_similar(a: FilesetEntity, b: FilesetEntity) -> bool:
+ """
+ This helper method checks if two Fileset entities are effectively equivalent: same set of files with comparable hashes.
+
+ Uses a set() of SHA1 hashes to test for equivalence.
+ """
+ a_hashes = set([f.sha1 for f in a.manifest])
+ b_hashes = set([f.sha1 for f in b.manifest])
+ return a_hashes == b_hashes
diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py
index 4f1cc3c4..c1fed31f 100644
--- a/python/fatcat_tools/importers/ingest.py
+++ b/python/fatcat_tools/importers/ingest.py
@@ -11,7 +11,7 @@ from fatcat_openapi_client import (
WebcaptureEntity,
)
-from .common import EntityImporter, make_rel_url
+from .common import EntityImporter, filesets_very_similar, make_rel_url
class IngestFileResultImporter(EntityImporter):
@@ -260,6 +260,16 @@ class IngestFileResultImporter(EntityImporter):
edit_extra["grobid_status_code"] = row["grobid"]["status_code"]
edit_extra["grobid_version"] = row["grobid"].get("grobid_version")
+ # fileset/platform metadata
+ if row.get("ingest_strategy"):
+ edit_extra["ingest_strategy"] = row["ingest_strategy"]
+ if row.get("platform_domain"):
+ edit_extra["platform_domain"] = row["platform_domain"]
+ if row.get("platform_name"):
+ edit_extra["platform_name"] = row["platform_name"]
+ if row.get("platform_id"):
+ edit_extra["platform_id"] = row["platform_id"]
+
return edit_extra
def parse_record(self, row: Dict[str, Any]) -> FileEntity:
@@ -518,7 +528,6 @@ class IngestWebResultImporter(IngestFileResultImporter):
)
edit_extra = self.parse_edit_extra(row)
-
if edit_extra:
wc.edit_extra = edit_extra
return wc
@@ -675,9 +684,9 @@ class IngestFilesetResultImporter(IngestFileResultImporter):
return True
def parse_fileset_urls(self, row: Dict[str, Any]) -> List[FilesetUrl]:
- if not row.get("strategy"):
+ if not row.get("ingest_strategy"):
return []
- strategy = row["strategy"]
+ strategy = row["ingest_strategy"]
urls = []
if strategy == "archiveorg-fileset" and row.get("archiveorg_item_name"):
urls.append(
@@ -686,17 +695,14 @@ class IngestFilesetResultImporter(IngestFileResultImporter):
rel="archive-base",
)
)
- if row["strategy"].startswith("web-") and row.get("platform_base_url"):
+ if strategy.startswith("web-") and row.get("platform_base_url"):
urls.append(
fatcat_openapi_client.FilesetUrl(
url=f"https://web.archive.org/web/{row['web_base_url_dt']}/{row['web_base_url']}",
rel="webarchive-base",
)
)
- # TODO: repository-base
- # TODO: web-base
-
- if row["strategy"] == "archiveorg-fileset-bundle" and row.get("archiveorg_item_name"):
+ if strategy == "archiveorg-fileset-bundle" and row.get("archiveorg_item_name"):
urls.append(
fatcat_openapi_client.FilesetUrl(
url=f"https://archive.org/download/{row['archiveorg_item_name']}/{row['archiveorg_bundle_path']}",
@@ -704,7 +710,7 @@ class IngestFilesetResultImporter(IngestFileResultImporter):
)
)
- if row["strategy"] == "web-fileset-bundle" and row.get("platform_bundle_url"):
+ if strategy == "web-fileset-bundle" and row.get("platform_bundle_url"):
urls.append(
fatcat_openapi_client.FilesetUrl(
url=f"https://web.archive.org/web/{row['web_bundle_url_dt']}/{row['web_bundle_url']}",
@@ -727,6 +733,15 @@ class IngestFilesetResultImporter(IngestFileResultImporter):
rel="repository-base",
)
)
+ elif row.get("terminal"):
+ # fallback generic web URL
+ urls.append(
+ fatcat_openapi_client.FilesetUrl(
+ url=row["terminal"]["terminal_url"],
+ rel="web",
+ )
+ )
+
return urls
def parse_record(self, row: Dict[str, Any]) -> FilesetEntity:
@@ -748,12 +763,6 @@ class IngestFilesetResultImporter(IngestFileResultImporter):
return None
entity_extra: Dict[str, Any] = dict()
- edit_extra = self.parse_edit_extra(row)
- edit_extra["ingest_strategy"] = row["ingest_strategy"]
- if row.get("platform"):
- edit_extra["platform"] = row["platform"]
- if row.get("platform_id"):
- edit_extra["platform_id"] = row["platform_id"]
entity_urls = self.parse_fileset_urls(row)
if not entity_urls:
@@ -770,33 +779,33 @@ class IngestFilesetResultImporter(IngestFileResultImporter):
fsf = fatcat_openapi_client.FilesetFile(
path=ingest_file["path"],
size=ingest_file["size"],
- md5=ingest_file["md5"],
- sha1=ingest_file["sha1"],
+ md5=ingest_file.get("md5"),
+ sha1=ingest_file.get("sha1"),
sha256=ingest_file.get("sha256"),
- extra=dict(
- mimetype=ingest_file["mimetype"],
- ),
+ mimetype=ingest_file.get("mimetype"),
+ extra=dict(),
)
- if not (fsf.md5 and fsf.sha1 and fsf.path and fsf.size):
+ if not (fsf.md5 and fsf.sha1 and fsf.path and fsf.size and fsf.mimetype):
self.counts["skip-partial-file-info"] += 1
return None
if ingest_file.get("platform_url"):
- # XXX: should we include this?
fsf.extra["original_url"] = ingest_file["platform_url"]
if ingest_file.get("terminal_url") and ingest_file.get("terminal_dt"):
fsf.extra[
"wayback_url"
] = f"https://web.archive.org/web/{ingest_file['terminal_dt']}/{ingest_file['terminal_url']}"
+ if not fsf.extra:
+ fsf.extra = None
manifest.append(fsf)
fe = fatcat_openapi_client.FilesetEntity(
manifest=manifest,
urls=entity_urls,
release_ids=[release_ident],
+ extra=entity_extra or None,
)
- if entity_extra:
- fe.extra = entity_extra
+ edit_extra = self.parse_edit_extra(row)
if edit_extra:
fe.edit_extra = edit_extra
return fe
@@ -805,26 +814,29 @@ class IngestFilesetResultImporter(IngestFileResultImporter):
# check for existing edits-in-progress with same URL
for other in self._entity_queue:
- # XXX: how to duplicate check?
- if other.original_url == fse.original_url:
+ if filesets_very_similar(other, fse):
self.counts["skip-in-queue"] += 1
+ self.counts["skip"] += 1
return False
# lookup sha1, or create new entity (TODO: API doesn't support this yet)
# existing = None
# NOTE: in lieu of existing checks (by lookup), only allow one fileset per release
- release = self.api.get_release(fse.release_ids[0], expand="filesets")
- if release.filesets:
- # XXX: how to duplicate check filesets?
+ if not self.bezerk_mode:
+ release = self.api.get_release(fse.release_ids[0], expand="filesets")
+
# check if this is an existing match, or just a similar hit
- for other in release.filesets:
- if fse.original_url == other.original_url:
- # TODO: compare very similar timestamps of same time (different formats)
+ for other in release.filesets or []:
+ if filesets_very_similar(other, fse):
self.counts["exists"] += 1
return False
- self.counts["skip-release-has-fileset"] += 1
- return False
+
+ # for now, being conservative and just skipping if release has any other fileset
+ if release.filesets:
+ self.counts["skip-release-has-fileset"] += 1
+ self.counts["skip"] += 1
+ return False
return True
@@ -849,6 +861,197 @@ class IngestFilesetResultImporter(IngestFileResultImporter):
)
+class IngestFilesetFileResultImporter(IngestFileResultImporter):
+ """
+ Variant of IngestFileResultImporter for processing dataset (Fileset) ingest
+ results, which resulted in a single file, into File entities.
+ """
+
+ def __init__(self, api: ApiClient, **kwargs) -> None:
+
+ eg_desc = (
+ kwargs.pop("editgroup_description", None)
+ or "Single files crawled from web using sandcrawler ingest tool, in dataset mode"
+ )
+ eg_extra = kwargs.pop("editgroup_extra", dict())
+ eg_extra["agent"] = eg_extra.get(
+ "agent", "fatcat_tools.IngestFilesetFileResultImporter"
+ )
+ kwargs["do_updates"] = False
+ super().__init__(api, editgroup_description=eg_desc, editgroup_extra=eg_extra, **kwargs)
+ self.max_file_count = 300
+
+ def want_fileset(self, row: Dict[str, Any]) -> bool:
+
+ manifest: Optional[List[Any]] = row.get("manifest")
+ if not manifest or len(manifest) == 0:
+ self.counts["skip-empty-manifest"] += 1
+ return False
+
+ if len(manifest) > 1:
+ self.counts["skip-multiple-files"] += 1
+ return False
+
+ assert len(manifest) == 1
+ return True
+
+ def want(self, row: Dict[str, Any]) -> bool:
+
+ if not self.want_ingest(row):
+ return False
+
+ if row.get("status") != "success-file":
+ self.counts["skip-status"] += 1
+ return False
+
+ # fileset-specific filters
+ if row["request"].get("ingest_type") not in [
+ "dataset",
+ ]:
+ self.counts["skip-ingest-type"] += 1
+ return False
+
+ if not self.want_fileset(row):
+ return False
+
+ return True
+
+ def parse_fileset_urls(self, row: Dict[str, Any]) -> List[FilesetUrl]:
+ if not row.get("ingest_strategy"):
+ return []
+ strategy = row["ingest_strategy"]
+ urls = []
+ # XXX
+ if strategy == "archiveorg-fileset" and row.get("archiveorg_item_name"):
+ urls.append(
+ fatcat_openapi_client.FilesetUrl(
+ url=f"https://archive.org/download/{row['archiveorg_item_name']}/",
+ rel="archive-base",
+ )
+ )
+ if strategy.startswith("web-") and row.get("platform_base_url"):
+ urls.append(
+ fatcat_openapi_client.FilesetUrl(
+ url=f"https://web.archive.org/web/{row['web_base_url_dt']}/{row['web_base_url']}",
+ rel="webarchive-base",
+ )
+ )
+ if strategy == "archiveorg-fileset-bundle" and row.get("archiveorg_item_name"):
+ urls.append(
+ fatcat_openapi_client.FilesetUrl(
+ url=f"https://archive.org/download/{row['archiveorg_item_name']}/{row['archiveorg_bundle_path']}",
+ rel="archive-bundle",
+ )
+ )
+
+ if strategy == "web-fileset-bundle" and row.get("platform_bundle_url"):
+ urls.append(
+ fatcat_openapi_client.FilesetUrl(
+ url=f"https://web.archive.org/web/{row['web_bundle_url_dt']}/{row['web_bundle_url']}",
+ rel="webarchive-bundle",
+ )
+ )
+
+ # add any additional / platform URLs here
+ if row.get("platform_bundle_url"):
+ urls.append(
+ fatcat_openapi_client.FilesetUrl(
+ url=row["platform_bundle_url"],
+ rel="repository-bundle",
+ )
+ )
+ if row.get("platform_base_url"):
+ urls.append(
+ fatcat_openapi_client.FilesetUrl(
+ url=row["platform_bundle_url"],
+ rel="repository-base",
+ )
+ )
+ elif row.get("terminal"):
+ # fallback generic web URL
+ urls.append(
+ fatcat_openapi_client.FilesetUrl(
+ url=row["terminal"]["terminal_url"],
+ rel="web",
+ )
+ )
+
+ return urls
+
+ def parse_record(self, row: Dict[str, Any]) -> FileEntity:
+
+ request = row["request"]
+
+ # double check that want() filtered request correctly
+ if request.get("ingest_type") not in [
+ "dataset",
+ ]:
+ self.counts["skip-ingest-type"] += 1
+ return None
+
+ # identify release by fatcat ident, or extid lookup
+ release_ident = self.parse_ingest_release_ident(row)
+
+ if not release_ident:
+ self.counts["skip-release-not-found"] += 1
+ return None
+
+ assert row["file_count"] == len(row["manifest"]) == 1
+ file_meta = row["manifest"][0]
+ # print(file_meta)
+ assert file_meta["status"] == "success"
+
+ # add file-level access URLs
+ entity_urls = []
+ if file_meta.get("platform_url"):
+ entity_urls.append(FileUrl(rel="web", url=file_meta["platform_url"]))
+ if file_meta.get("terminal_url") and file_meta.get("terminal_dt"):
+ entity_urls.append(
+ FileUrl(
+ rel="webarchive",
+ url=f"https://web.archive.org/web/{file_meta['terminal_dt']}/{file_meta['terminal_url']}",
+ )
+ )
+ if row["ingest_strategy"] == "archiveorg-file":
+ entity_urls.append(
+ FileUrl(
+ rel="archive",
+ url=f"https://archive.org/download/{row['archiveorg_item_name']}/{file_meta['path']}",
+ )
+ )
+
+ if not entity_urls:
+ self.counts["skip-no-access-url"] += 1
+ return None
+
+ entity_extra: Dict[str, Any] = dict()
+ entity_extra["path"] = file_meta["path"]
+
+ # this is to work around a bug in old sandcrawler ingest code
+ if file_meta["md5"] == file_meta["sha1"]:
+ self.counts["skip-bad-hashes"] += 1
+ return None
+
+ fe = FileEntity(
+ md5=file_meta["md5"],
+ sha1=file_meta["sha1"],
+ sha256=file_meta["sha256"],
+ size=file_meta["size"],
+ mimetype=file_meta["mimetype"],
+ release_ids=[release_ident],
+ urls=entity_urls,
+ extra=entity_extra or None,
+ )
+ if not (fe.md5 and fe.sha1 and fe.sha256 and (fe.size is not None) and fe.mimetype):
+ self.counts["skip-partial-file-info"] += 1
+ return None
+
+ edit_extra = self.parse_edit_extra(row)
+ if edit_extra:
+ fe.edit_extra = edit_extra
+ return fe
+
+
class SavePaperNowFilesetImporter(IngestFilesetResultImporter):
"""
Like SavePaperNowFileImporter, but for fileset/dataset ingest.