aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/persist.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/persist.py')
-rw-r--r--python/sandcrawler/persist.py63
1 files changed, 62 insertions, 1 deletions
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 7fe59f1..af702ca 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -137,7 +137,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
ingest_type = raw['request'].get('ingest_type')
if ingest_type == 'file':
ingest_type = 'pdf'
- if ingest_type not in ('pdf', 'xml', 'html'):
+ if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset', 'dataset-file'):
self.counts['skip-ingest-type'] += 1
return None
if raw['status'] in ("existing", ):
@@ -179,6 +179,47 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
resources=record.get('html_resources'),
)
+ def result_to_platform_row(self, raw: dict) -> Optional[dict]:
+ """
+ Converts fileset ingest-result JSON schema (eg, from Kafka) to SQL ingest_fileset_platform schema
+
+ if there is a problem with conversion, return None and set skip count
+ """
+ for k in ('request', 'hit', 'status'):
+ if not k in raw:
+ return None
+ if not 'base_url' in raw['request']:
+ return None
+ ingest_type = raw['request'].get('ingest_type')
+ if ingest_type not in ('dataset'):
+ return None
+ if raw['status'] in ("existing", ):
+ return None
+ if not raw.get('platform_name'):
+ return None
+ result = {
+ 'ingest_type': ingest_type,
+ 'base_url': raw['request']['base_url'],
+ 'hit': raw['hit'],
+ 'status': raw['status'],
+ 'platform_name': raw.get('platform_name'),
+ 'platform_domain': raw.get('platform_domain'),
+ 'platform_id': raw.get('platform_id'),
+ 'ingest_strategy': raw.get('ingest_strategy'),
+ 'total_size': raw.get('total_size'),
+ 'file_count': raw.get('file_count'),
+ 'archiveorg_item_name': raw.get('archiveorg_item_name'),
+ 'archiveorg_item_bundle_path': None,
+ 'web_bundle_url': None,
+ 'web_bundle_dt': None,
+ 'manifest': raw.get('manifest'),
+ }
+ if result.get('fileset_bundle'):
+ result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get('archiveorg_item_bundle_path')
+ result['web_bundle_url'] = result['fileset_bundle'].get('terminal', {}).get('terminal_url')
+ result['web_bundle_dt'] = result['fileset_bundle'].get('terminal', {}).get('terminal_dt')
+ return result
+
def push_batch(self, batch):
self.counts['total'] += len(batch)
@@ -223,9 +264,29 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
self.counts['insert-html_meta'] += resp[0]
self.counts['update-html_meta'] += resp[1]
+ fileset_platform_batch = [self.result_to_platform_row(raw) for raw in batch if raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name')]
+ fileset_platform_batch = [p for p in fileset_platform_batch if p]
+ if fileset_platform_batch:
+ resp = self.db.insert_ingest_fileset_platform(self.cur, fileset_platform_batch, on_conflict="update")
+ self.counts['insert-fileset_platform'] += resp[0]
+ self.counts['update-fileset_platform'] += resp[1]
+
self.db.commit()
return []
+class PersistIngestFilesetWorker(SandcrawlerWorker):
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+
+ def process(self, record, key=None):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
class PersistIngestRequestWorker(PersistIngestFileResultWorker):
def __init__(self, db_url, **kwargs):