From bb06c833f500fbd37579ffb4aa1c53dc0d1e9c96 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 15 Oct 2021 18:04:39 -0700 Subject: persist support for ingest platform table, using existing persist worker --- python/sandcrawler/persist.py | 63 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) (limited to 'python/sandcrawler/persist.py') diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 7fe59f1..af702ca 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -137,7 +137,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): ingest_type = raw['request'].get('ingest_type') if ingest_type == 'file': ingest_type = 'pdf' - if ingest_type not in ('pdf', 'xml', 'html'): + if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset', 'dataset-file'): self.counts['skip-ingest-type'] += 1 return None if raw['status'] in ("existing", ): @@ -179,6 +179,47 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): resources=record.get('html_resources'), ) + def result_to_platform_row(self, raw: dict) -> Optional[dict]: + """ + Converts fileset ingest-result JSON schema (eg, from Kafka) to SQL ingest_fileset_platform schema + + if there is a problem with conversion, return None and set skip count + """ + for k in ('request', 'hit', 'status'): + if not k in raw: + return None + if not 'base_url' in raw['request']: + return None + ingest_type = raw['request'].get('ingest_type') + if ingest_type not in ('dataset'): + return None + if raw['status'] in ("existing", ): + return None + if not raw.get('platform_name'): + return None + result = { + 'ingest_type': ingest_type, + 'base_url': raw['request']['base_url'], + 'hit': raw['hit'], + 'status': raw['status'], + 'platform_name': raw.get('platform_name'), + 'platform_domain': raw.get('platform_domain'), + 'platform_id': raw.get('platform_id'), + 'ingest_strategy': raw.get('ingest_strategy'), + 'total_size': raw.get('total_size'), + 'file_count': raw.get('file_count'), + 'archiveorg_item_name': raw.get('archiveorg_item_name'), + 'archiveorg_item_bundle_path': None, + 'web_bundle_url': None, + 'web_bundle_dt': None, + 'manifest': raw.get('manifest'), + } + if result.get('fileset_bundle'): + result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get('archiveorg_item_bundle_path') + result['web_bundle_url'] = result['fileset_bundle'].get('terminal', {}).get('terminal_url') + result['web_bundle_dt'] = result['fileset_bundle'].get('terminal', {}).get('terminal_dt') + return result + def push_batch(self, batch): self.counts['total'] += len(batch) @@ -223,9 +264,29 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): self.counts['insert-html_meta'] += resp[0] self.counts['update-html_meta'] += resp[1] + fileset_platform_batch = [self.result_to_platform_row(raw) for raw in batch if raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name')] + fileset_platform_batch = [p for p in fileset_platform_batch if p] + if fileset_platform_batch: + resp = self.db.insert_ingest_fileset_platform(self.cur, fileset_platform_batch, on_conflict="update") + self.counts['insert-fileset_platform'] += resp[0] + self.counts['update-fileset_platform'] += resp[1] + self.db.commit() return [] +class PersistIngestFilesetWorker(SandcrawlerWorker): + + def __init__(self, db_url, **kwargs): + super().__init__() + self.db = SandcrawlerPostgresClient(db_url) + self.cur = self.db.conn.cursor() + + def process(self, record, key=None): + """ + Only do batches (as transactions) + """ + raise NotImplementedError + class PersistIngestRequestWorker(PersistIngestFileResultWorker): def __init__(self, db_url, **kwargs): -- cgit v1.2.3