aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2021-10-15 18:04:39 -0700
committerBryan Newbold <bnewbold@archive.org>2021-10-15 18:15:29 -0700
commitbb06c833f500fbd37579ffb4aa1c53dc0d1e9c96 (patch)
treeb00d936c4cf6d073405ef5590dd93bc07431f6bf
parent350a4e64aa60896391c1040d958b6b039ea3a79f (diff)
downloadsandcrawler-bb06c833f500fbd37579ffb4aa1c53dc0d1e9c96.tar.gz
sandcrawler-bb06c833f500fbd37579ffb4aa1c53dc0d1e9c96.zip
persist support for ingest platform table, using existing persist worker
-rw-r--r--proposals/2021-09-09_fileset_ingest.md4
-rw-r--r--python/sandcrawler/db.py68
-rw-r--r--python/sandcrawler/persist.py63
3 files changed, 131 insertions, 4 deletions
diff --git a/proposals/2021-09-09_fileset_ingest.md b/proposals/2021-09-09_fileset_ingest.md
index fbbccf9..beb9d6a 100644
--- a/proposals/2021-09-09_fileset_ingest.md
+++ b/proposals/2021-09-09_fileset_ingest.md
@@ -289,8 +289,8 @@ Note that this table *complements* `ingest_file_result`, doesn't replace it.
);
CREATE INDEX ingest_fileset_platform_name_domain_id_idx ON ingest_fileset_platform(platform_name, platform_domain, platform_id);
-Persist worker should only insert in to this table if `platform_name`,
-`platform_domain`, and `platform_id` are extracted successfully.
+Persist worker should only insert in to this table if `platform_name` is
+identified.
## New Kafka Topic
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
index e60b310..9b55c0c 100644
--- a/python/sandcrawler/db.py
+++ b/python/sandcrawler/db.py
@@ -77,6 +77,18 @@ class SandcrawlerPostgrestClient:
else:
return None
+ def get_ingest_fileset_platform(self, ingest_type: str, url: str) -> Optional[dict]:
+ resp = requests.get(
+ self.api_url + "/ingest_fileset_platform",
+ params=dict(ingest_type=f"eq.{ingest_type}", base_url=f"eq.{url}"),
+ )
+ resp.raise_for_status()
+ resp_json = resp.json()
+ if resp_json:
+ return resp_json[0]
+ else:
+ return None
+
def get_crossref(self, doi):
resp = requests.get(self.api_url + "/crossref", params=dict(doi='eq.'+doi))
resp.raise_for_status()
@@ -382,7 +394,7 @@ class SandcrawlerPostgresClient:
INSERT INTO
ingest_file_result (ingest_type, base_url, hit, status, terminal_url, terminal_dt, terminal_status_code, terminal_sha1hex)
VALUES %s
- ON CONFLICT ON CONSTRAINT ingest_file_result_pkey DO
+ ON CONFLICT ON CONSTRAINT ingest_file_result_pkey DO
"""
if on_conflict.lower() == "nothing":
sql += " NOTHING"
@@ -416,3 +428,57 @@ class SandcrawlerPostgresClient:
batch = list(batch_dict.values())
resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
return self._inserts_and_updates(resp, on_conflict)
+
+ def insert_ingest_fileset_platform(self, cur, batch, on_conflict="nothing"):
+ sql = """
+ INSERT INTO
+ ingest_fileset_platform (ingest_type, base_url, hit, status, platform_name, platform_domain, platform_id, ingest_strategy, total_size, file_count, archiveorg_item_name, archiveorg_item_bundle_path, web_bundle_url, web_bundle_dt, manifest)
+ VALUES %s
+ ON CONFLICT ON CONSTRAINT ingest_fileset_platform_pkeypkey DO
+ """
+ if on_conflict.lower() == "nothing":
+ sql += " NOTHING"
+ elif on_conflict.lower() == "update":
+ sql += """ UPDATE SET
+ updated=now(),
+ hit=EXCLUDED.hit,
+ status=EXCLUDED.status,
+ platform_name=EXCLUDED.platform_name,
+ platform_domain=EXCLUDED.platform_domain,
+ platform_id=EXCLUDED.platform_id,
+ ingest_strategy=EXCLUDED.ingest_strategy,
+ total_size=EXCLUDED.total_size,
+ file_count=EXCLUDED.file_count,
+ archiveorg_item_name=EXCLUDED.archiveorg_item_name,
+ archiveorg_item_bundle_path=EXCLUDED.archiveorg_item_bundle_path,
+ web_bundle_url=EXCLUDED.web_bundle_url,
+ web_bundle_dt=EXCLUDED.web_bundle_dt,
+ manifest=EXCLUDED.manifest
+ """
+ else:
+ raise NotImplementedError("on_conflict: {}".format(on_conflict))
+ sql += " RETURNING xmax;"
+ batch = [(d['ingest_type'],
+ d['base_url'],
+ bool(d['hit']),
+ d['status'],
+ d.get('platform_name'),
+ d.get('platform_domain'),
+ d.get('platform_id'),
+ d.get('ingest_strategy'),
+ d.get('total_size'),
+ d.get('file_count'),
+ d.get('archiveorg_item_name'),
+ d.get('archiveorg_item_bundle_path'),
+ d.get('web_bundle_url'),
+ d.get('web_bundle_dt'),
+ d.get('manifest'),
+ )
+ for d in batch]
+ # filter out duplicate rows by key (ingest_type, base_url)
+ batch_dict = dict()
+ for b in batch:
+ batch_dict[(b[0], b[1])] = b
+ batch = list(batch_dict.values())
+ resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True)
+ return self._inserts_and_updates(resp, on_conflict)
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 7fe59f1..af702ca 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -137,7 +137,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
ingest_type = raw['request'].get('ingest_type')
if ingest_type == 'file':
ingest_type = 'pdf'
- if ingest_type not in ('pdf', 'xml', 'html'):
+ if ingest_type not in ('pdf', 'xml', 'html', 'component', 'src', 'dataset', 'dataset-file'):
self.counts['skip-ingest-type'] += 1
return None
if raw['status'] in ("existing", ):
@@ -179,6 +179,47 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
resources=record.get('html_resources'),
)
+ def result_to_platform_row(self, raw: dict) -> Optional[dict]:
+ """
+ Converts fileset ingest-result JSON schema (eg, from Kafka) to SQL ingest_fileset_platform schema
+
+ if there is a problem with conversion, return None and set skip count
+ """
+ for k in ('request', 'hit', 'status'):
+ if not k in raw:
+ return None
+ if not 'base_url' in raw['request']:
+ return None
+ ingest_type = raw['request'].get('ingest_type')
+ if ingest_type not in ('dataset'):
+ return None
+ if raw['status'] in ("existing", ):
+ return None
+ if not raw.get('platform_name'):
+ return None
+ result = {
+ 'ingest_type': ingest_type,
+ 'base_url': raw['request']['base_url'],
+ 'hit': raw['hit'],
+ 'status': raw['status'],
+ 'platform_name': raw.get('platform_name'),
+ 'platform_domain': raw.get('platform_domain'),
+ 'platform_id': raw.get('platform_id'),
+ 'ingest_strategy': raw.get('ingest_strategy'),
+ 'total_size': raw.get('total_size'),
+ 'file_count': raw.get('file_count'),
+ 'archiveorg_item_name': raw.get('archiveorg_item_name'),
+ 'archiveorg_item_bundle_path': None,
+ 'web_bundle_url': None,
+ 'web_bundle_dt': None,
+ 'manifest': raw.get('manifest'),
+ }
+ if result.get('fileset_bundle'):
+ result['archiveorg_item_bundle_path'] = result['fileset_bundle'].get('archiveorg_item_bundle_path')
+ result['web_bundle_url'] = result['fileset_bundle'].get('terminal', {}).get('terminal_url')
+ result['web_bundle_dt'] = result['fileset_bundle'].get('terminal', {}).get('terminal_dt')
+ return result
+
def push_batch(self, batch):
self.counts['total'] += len(batch)
@@ -223,9 +264,29 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
self.counts['insert-html_meta'] += resp[0]
self.counts['update-html_meta'] += resp[1]
+ fileset_platform_batch = [self.result_to_platform_row(raw) for raw in batch if raw.get('request', {}).get('ingest_type') == 'dataset' and raw.get('platform_name')]
+ fileset_platform_batch = [p for p in fileset_platform_batch if p]
+ if fileset_platform_batch:
+ resp = self.db.insert_ingest_fileset_platform(self.cur, fileset_platform_batch, on_conflict="update")
+ self.counts['insert-fileset_platform'] += resp[0]
+ self.counts['update-fileset_platform'] += resp[1]
+
self.db.commit()
return []
+class PersistIngestFilesetWorker(SandcrawlerWorker):
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+
+ def process(self, record, key=None):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
class PersistIngestRequestWorker(PersistIngestFileResultWorker):
def __init__(self, db_url, **kwargs):