diff options
Diffstat (limited to 'python/sandcrawler/db.py')
-rw-r--r-- | python/sandcrawler/db.py | 68 |
1 files changed, 67 insertions, 1 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py index e60b310..9b55c0c 100644 --- a/python/sandcrawler/db.py +++ b/python/sandcrawler/db.py @@ -77,6 +77,18 @@ class SandcrawlerPostgrestClient: else: return None + def get_ingest_fileset_platform(self, ingest_type: str, url: str) -> Optional[dict]: + resp = requests.get( + self.api_url + "/ingest_fileset_platform", + params=dict(ingest_type=f"eq.{ingest_type}", base_url=f"eq.{url}"), + ) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + def get_crossref(self, doi): resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.'+doi)) resp.raise_for_status() @@ -382,7 +394,7 @@ class SandcrawlerPostgresClient: INSERT INTO ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex) VALUES %s - ON CONFLICT ON CONSTRAINT ingest_file_result_pkey DO + ON CONFLICT ON CONSTRAINT ingest_file_result_pkey DO """ if on_conflict.lower() == "nothing": sql += " NOTHING" @@ -416,3 +428,57 @@ class SandcrawlerPostgresClient: batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) + + def insert_ingest_fileset_platform(self, cur, batch, on_conflict="nothing"): + sql = """ + INSERT INTO + ingest_fileset_platform (ingest_type, base_url, hit, status, platform_name, platform_domain, platform_id, ingest_strategy, total_size, file_count, archiveorg_item_name, archiveorg_item_bundle_path, web_bundle_url, web_bundle_dt, manifest) + VALUES %s + ON CONFLICT ON CONSTRAINT ingest_fileset_platform_pkeypkey DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + updated=now(), + hit=EXCLUDED.hit, + status=EXCLUDED.status, + platform_name=EXCLUDED.platform_name, + platform_domain=EXCLUDED.platform_domain, + platform_id=EXCLUDED.platform_id, + ingest_strategy=EXCLUDED.ingest_strategy, + total_size=EXCLUDED.total_size, + file_count=EXCLUDED.file_count, + archiveorg_item_name=EXCLUDED.archiveorg_item_name, + archiveorg_item_bundle_path=EXCLUDED.archiveorg_item_bundle_path, + web_bundle_url=EXCLUDED.web_bundle_url, + web_bundle_dt=EXCLUDED.web_bundle_dt, + manifest=EXCLUDED.manifest + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + batch = [(d['ingest_type'], + d['base_url'], + bool(d['hit']), + d['status'], + d.get('platform_name'), + d.get('platform_domain'), + d.get('platform_id'), + d.get('ingest_strategy'), + d.get('total_size'), + d.get('file_count'), + d.get('archiveorg_item_name'), + d.get('archiveorg_item_bundle_path'), + d.get('web_bundle_url'), + d.get('web_bundle_dt'), + d.get('manifest'), + ) + for d in batch] + # filter out duplicate rows by key (ingest_type, base_url) + batch_dict = dict() + for b in batch: + batch_dict[(b[0], b[1])] = b + batch = list(batch_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) |