aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/db.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/db.py')
-rw-r--r--python/sandcrawler/db.py68
1 files changed, 67 insertions, 1 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
index e60b310..9b55c0c 100644
--- a/python/sandcrawler/db.py
+++ b/python/sandcrawler/db.py
@@ -77,6 +77,18 @@ class SandcrawlerPostgrestClient:
else:
return None
+ def get_ingest_fileset_platform(self, ingest_type: str, url: str) -> Optional[dict]:
+ resp = requests.get(
+ self.api_url + "/ingest_fileset_platform",
+ params=dict(ingest_type=f"eq.{ingest_type}", base_url=f"eq.{url}"),
+ )
+ resp.raise_for_status()
+ resp_json = resp.json()
+ if resp_json:
+ return resp_json[0]
+ else:
+ return None
+
def get_crossref(self, doi):
resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.'+doi))
resp.raise_for_status()
@@ -382,7 +394,7 @@ class SandcrawlerPostgresClient:
INSERT INTO
ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex)
VALUES %s
- ON CONFLICT ON CONSTRAINT ingest_file_result_pkey DO
+ ON CONFLICT ON CONSTRAINT ingest_file_result_pkey DO
"""
if on_conflict.lower() == "nothing":
sql += " NOTHING"
@@ -416,3 +428,57 @@ class SandcrawlerPostgresClient:
batch = list(batch_dict.values())
resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
+
+ def insert_ingest_fileset_platform(self, cur, batch, on_conflict="nothing"):
+ sql = """
+ INSERT INTO
+ ingest_fileset_platform (ingest_type, base_url, hit, status, platform_name, platform_domain, platform_id, ingest_strategy, total_size, file_count, archiveorg_item_name, archiveorg_item_bundle_path, web_bundle_url, web_bundle_dt, manifest)
+ VALUES %s
+ ON CONFLICT ON CONSTRAINT ingest_fileset_platform_pkeypkey DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ elif on_conflict.lower() == "update":
+ sql += """ UPDATE SET
+ updated=now(),
+ hit=EXCLUDED.hit,
+ status=EXCLUDED.status,
+ platform_name=EXCLUDED.platform_name,
+ platform_domain=EXCLUDED.platform_domain,
+ platform_id=EXCLUDED.platform_id,
+ ingest_strategy=EXCLUDED.ingest_strategy,
+ total_size=EXCLUDED.total_size,
+ file_count=EXCLUDED.file_count,
+ archiveorg_item_name=EXCLUDED.archiveorg_item_name,
+ archiveorg_item_bundle_path=EXCLUDED.archiveorg_item_bundle_path,
+ web_bundle_url=EXCLUDED.web_bundle_url,
+ web_bundle_dt=EXCLUDED.web_bundle_dt,
+ manifest=EXCLUDED.manifest
+ """
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING xmax;"
+ batch = [(d['ingest_type'],
+ d['base_url'],
+ bool(d['hit']),
+ d['status'],
+ d.get('platform_name'),
+ d.get('platform_domain'),
+ d.get('platform_id'),
+ d.get('ingest_strategy'),
+ d.get('total_size'),
+ d.get('file_count'),
+ d.get('archiveorg_item_name'),
+ d.get('archiveorg_item_bundle_path'),
+ d.get('web_bundle_url'),
+ d.get('web_bundle_dt'),
+ d.get('manifest'),
+ )
+ for d in batch]
+ # filter out duplicate rows by key (ingest_type, base_url)
+ batch_dict = dict()
+ for b in batch:
+ batch_dict[(b[0], b[1])] = b
+ batch = list(batch_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)