From bb06c833f500fbd37579ffb4aa1c53dc0d1e9c96 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 15 Oct 2021 18:04:39 -0700 Subject: persist support for ingest platform table, using existing persist worker --- proposals/2021-09-09_fileset_ingest.md | 4 +- python/sandcrawler/db.py | 68 +++++++++++++++++++++++++++++++++- python/sandcrawler/persist.py | 63 ++++++++++++++++++++++++++++++- 3 files changed, 131 insertions(+), 4 deletions(-) diff --git a/proposals/2021-09-09_fileset_ingest.md b/proposals/2021-09-09_fileset_ingest.md index fbbccf9..beb9d6a 100644 --- a/proposals/2021-09-09_fileset_ingest.md +++ b/proposals/2021-09-09_fileset_ingest.md @@ -289,8 +289,8 @@ Note that this table *complements* `ingest_file_result`, doesn't replace it. ); CREATE INDEX ingest_fileset_platform_name_domain_id_idx ON ingest_fileset_platform(platform_name, platform_domain, platform_id); -Persist worker should only insert in to this table if `platform_name`, -`platform_domain`, and `platform_id` are extracted successfully. +Persist worker should only insert in to this table if `platform_name` is +identified. ## New Kafka Topic diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py index e60b310..9b55c0c 100644 --- a/python/sandcrawler/db.py +++ b/python/sandcrawler/db.py @@ -77,6 +77,18 @@ class SandcrawlerPostgrestClient: else: return None + def get_ingest_fileset_platform(self, ingest_type: str, url: str) -> Optional[dict]: + resp = requests.get( + self.api_url + "/ingest_fileset_platform", + params=dict(ingest_type=f"eq.{ingest_type}", base_url=f"eq.{url}"), + ) + resp.raise_for_status() + resp_json = resp.json() + if resp_json: + return resp_json[0] + else: + return None + def get_crossref(self, doi): resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.'+doi)) resp.raise_for_status() @@ -382,7 +394,7 @@ class SandcrawlerPostgresClient: INSERT INTO ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex) VALUES %s - ON CONFLICT ON CONSTRAINT ingest_file_result_pkey DO + ON CONFLICT ON CONSTRAINT ingest_file_result_pkey DO """ if on_conflict.lower() == "nothing": sql += " NOTHING" @@ -416,3 +428,57 @@ class SandcrawlerPostgresClient: batch = list(batch_dict.values()) resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) return self._inserts_and_updates(resp, on_conflict) + + def insert_ingest_fileset_platform(self, cur, batch, on_conflict="nothing"): + sql = """ + INSERT INTO + ingest_fileset_platform (ingest_type, base_url, hit, status, platform_name, platform_domain, platform_id, ingest_strategy, total_size, file_count, archiveorg_item_name, archiveorg_item_bundle_path, web_bundle_url, web_bundle_dt, manifest) + VALUES %s + ON CONFLICT ON CONSTRAINT ingest_fileset_platform_pkeypkey DO + """ + if on_conflict.lower() == "nothing": + sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + updated=now(), + hit=EXCLUDED.hit, + status=EXCLUDED.status, + platform_name=EXCLUDED.platform_name, + platform_domain=EXCLUDED.platform_domain, + platform_id=EXCLUDED.platform_id, + ingest_strategy=EXCLUDED.ingest_strategy, + total_size=EXCLUDED.total_size, + file_count=EXCLUDED.file_count, + archiveorg_item_name=EXCLUDED.archiveorg_item_name, + archiveorg_item_bundle_path=EXCLUDED.archiveorg_item_bundle_path, + web_bundle_url=EXCLUDED.web_bundle_url, + web_bundle_dt=EXCLUDED.web_bundle_dt, + manifest=EXCLUDED.manifest + """ + else: + raise NotImplementedError("on_conflict: {}".format(on_conflict)) + sql += " RETURNING xmax;" + batch = [(d['ingest_type'], + d['base_url'], + bool(d['hit']), + d['status'], + d.get('platform_name'), + d.get('platform_domain'), + d.get('platform_id'), + d.get('ingest_strategy'), + d.get('total_size'), + d.get('file_count'), + d.get('archiveorg_item_name'), + d.get('archiveorg_item_bundle_path'), + d.get('web_bundle_url'), + d.get('web_bundle_dt'), + d.get('manifest'), + ) + for d in batch] + # filter out duplicate rows by key (ingest_type, base_url) + batch_dict = dict() + for b in batch: + batch_dict[(b[0], b[1])] = b + batch = list(batch_dict.values()) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 7fe59f1..af702ca 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -137,7 +137,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): ingest_type = raw['request'].get('ingest_type') if ingest_type == 'file': ingest_type = 'pdf' - if ingest_type not in ('pdf', 'xml', 'html'): + if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset', 'dataset-file'): self.counts['skip-ingest-type'] += 1 return None if raw['status'] in ("existing", ): @@ -179,6 +179,47 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): resources=record.get('html_resources'), ) + def result_to_platform_row(self, raw: dict) -> Optional[dict]: + """ + Converts fileset ingest-result JSON schema (eg, from Kafka) to SQL ingest_fileset_platform schema + + if there is a problem with conversion, return None and set skip count + """ + for k in ('request', 'hit', 'status'): + if not k in raw: + return None + if not 'base_url' in raw['request']: + return None + ingest_type = raw['request'].get('ingest_type') + if ingest_type not in ('dataset'): + return None + if raw['status'] in ("existing", ): + return None + if not raw.get('platform_name'): + return None + result = { + 'ingest_type': ingest_type, + 'base_url': raw['request']['base_url'], + 'hit': raw['hit'], + 'status': raw['status'], + 'platform_name': raw.get('platform_name'), + 'platform_domain': raw.get('platform_domain'), + 'platform_id': raw.get('platform_id'), + 'ingest_strategy': raw.get('ingest_strategy'), + 'total_size': raw.get('total_size'), + 'file_count': raw.get('file_count'), + 'archiveorg_item_name': raw.get('archiveorg_item_name'), + 'archiveorg_item_bundle_path': None, + 'web_bundle_url': None, + 'web_bundle_dt': None, + 'manifest': raw.get('manifest'), + } + if result.get('fileset_bundle'): + result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get('archiveorg_item_bundle_path') + result['web_bundle_url'] = result['fileset_bundle'].get('terminal', {}).get('terminal_url') + result['web_bundle_dt'] = result['fileset_bundle'].get('terminal', {}).get('terminal_dt') + return result + def push_batch(self, batch): self.counts['total'] += len(batch) @@ -223,9 +264,29 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): self.counts['insert-html_meta'] += resp[0] self.counts['update-html_meta'] += resp[1] + fileset_platform_batch = [self.result_to_platform_row(raw) for raw in batch if raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name')] + fileset_platform_batch = [p for p in fileset_platform_batch if p] + if fileset_platform_batch: + resp = self.db.insert_ingest_fileset_platform(self.cur, fileset_platform_batch, on_conflict="update") + self.counts['insert-fileset_platform'] += resp[0] + self.counts['update-fileset_platform'] += resp[1] + self.db.commit() return [] +class PersistIngestFilesetWorker(SandcrawlerWorker): + + def __init__(self, db_url, **kwargs): + super().__init__() + self.db = SandcrawlerPostgresClient(db_url) + self.cur = self.db.conn.cursor() + + def process(self, record, key=None): + """ + Only do batches (as transactions) + """ + raise NotImplementedError + class PersistIngestRequestWorker(PersistIngestFileResultWorker): def __init__(self, db_url, **kwargs): -- cgit v1.2.3