aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/persist.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2019-12-24 16:39:14 -0800
committerBryan Newbold <bnewbold@archive.org>2020-01-02 18:12:58 -0800
commit460843e31ebea16fcb543b8448365cfe004103b0 (patch)
treef5576149afa91b6807decb048e2d5ba461a2a6da /python/sandcrawler/persist.py
parent4a3eade72ea557eb5e04c59c454887d20c718314 (diff)
downloadsandcrawler-460843e31ebea16fcb543b8448365cfe004103b0.tar.gz
sandcrawler-460843e31ebea16fcb543b8448365cfe004103b0.zip
start work on persist workers and tool
Diffstat (limited to 'python/sandcrawler/persist.py')
-rw-r--r--python/sandcrawler/persist.py223
1 files changed, 223 insertions, 0 deletions
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
new file mode 100644
index 0000000..07e6c83
--- /dev/null
+++ b/python/sandcrawler/persist.py
@@ -0,0 +1,223 @@
+
+"""
+cdx
+- read raw CDX, filter
+- push to SQL table
+
+ingest-file-result
+- read JSON format (batch)
+- cdx SQL push batch (on conflict skip)
+- file_meta SQL push batch (on conflict update)
+- ingest request push batch (on conflict skip)
+- ingest result push batch (on conflict update)
+
+grobid
+- reads JSON format (batch)
+- grobid2json
+- minio push (one-by-one)
+- grobid SQL push batch (on conflict update)
+- file_meta SQL push batch (on conflict update)
+"""
+
+from sandcrawler.workers import SandcrawlerWorker
+from sandcrawler.db import SandcrawlerPostgresClient
+from sandcrawler.minio import SandcrawlerMinioClient
+from sandcrawler.grobid import GrobidClient
+
+
+class PersistCdxWorker(SandcrawlerWorker):
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+
+ def process(self, record):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+ self.db.insert_cdx(self.cur, batch)
+ self.counts['insert-cdx'] += len(batch)
+ self.db.commit()
+ return []
+
+class PersistIngestFileResultWorker(SandcrawlerWorker):
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+
+ def process(self, record):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def request_to_row(self, raw):
+ """
+ Converts ingest-request JSON schema (eg, from Kafka) to SQL ingest_request schema
+
+ if there is a problem with conversion, return None
+ """
+ # backwards compat hacks; transform request to look like current schema
+ if raw.get('ingest_type') == 'file':
+ raw['ingest_type'] = 'pdf'
+ if (not raw.get('link_source')
+ and raw.get('base_url')
+ and raw.get('ext_ids', {}).get('doi')
+ and raw['base_url'] == "https://doi.org/{}".format(raw['ext_ids']['doi'])):
+ # set link_source(_id) for old ingest requests
+ raw['link_source'] = 'doi'
+ raw['link_source_id'] = raw['ext_ids']['doi']
+ if (not raw.get('link_source')
+ and raw.get('ingest_request_source', '').startswith('savepapernow')
+ and raw.get('fatcat', {}).get('release_ident')):
+ # set link_source(_id) for old ingest requests
+ raw['link_source'] = 'spn'
+ raw['link_source_id'] = raw['fatcat']['release_ident']
+
+ for k in ('ingest_type', 'base_url', 'link_source', 'link_source_id'):
+ if not k in raw:
+ self.counts['skip-fields'] += 1
+ return None
+ if raw['ingest_type'] not in ('pdf', 'xml'):
+ print(raw['ingest_type'])
+ self.counts['skip-ingest-type'] += 1
+ return None
+ request = {
+ 'ingest_type': raw['ingest_type'],
+ 'base_url': raw['base_url'],
+ 'link_source': raw['link_source'],
+ 'link_source_id': raw['link_source_id'],
+ 'request': {},
+ }
+ # extra/optional fields
+ if raw.get('release_stage'):
+ request['release_stage'] = raw['release_stage']
+ if raw.get('fatcat', {}).get('release_ident'):
+ request['request']['release_ident'] = raw['fatcat']['release_ident']
+ for k in ('ext_ids', 'edit_extra'):
+ if raw.get(k):
+ request['request'][k] = raw[k]
+ # if this dict is empty, trim it to save DB space
+ if not request['request']:
+ request['request'] = None
+ return request
+
+
+ def file_result_to_row(self, raw):
+ """
+ Converts ingest-result JSON schema (eg, from Kafka) to SQL ingest_file_result schema
+
+ if there is a problem with conversion, return None and set skip count
+ """
+ for k in ('request', 'hit', 'status'):
+ if not k in raw:
+ self.counts['skip-fields'] += 1
+ return None
+ if not 'base_url' in raw['request']:
+ self.counts['skip-fields'] += 1
+ return None
+ ingest_type = raw['request'].get('ingest_type')
+ if ingest_type == 'file':
+ ingest_type = 'pdf'
+ if ingest_type not in ('pdf', 'xml'):
+ self.counts['skip-ingest-type'] += 1
+ return None
+ result = {
+ 'ingest_type': ingest_type,
+ 'base_url': raw['request']['base_url'],
+ 'hit': raw['hit'],
+ 'status': raw['status'],
+ }
+ terminal = raw.get('terminal')
+ if terminal:
+ result['terminal_url'] = terminal['url']
+ if terminal.get('status_code') == None and terminal.get('http_status'):
+ terminal['status_code'] = terminal['http_status']
+ result['terminal_status_code'] = int(terminal['status_code'])
+ if raw.get('file_meta'):
+ result['terminal_sha1hex'] = raw['file_meta']['sha1hex']
+ if raw.get('cdx') and raw['cdx']['url'] == terminal['url']:
+ result['terminal_dt'] = raw['cdx']['datetime']
+ return result
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+
+ if not batch:
+ return []
+
+ results = [self.file_result_to_row(raw) for raw in batch]
+ results = [r for r in results if r]
+ requests = [self.request_to_row(raw['request']) for raw in batch if raw.get('request')]
+ requests = [r for r in requests if r]
+
+ if requests:
+ self.db.insert_ingest_request(self.cur, requests)
+ self.counts['insert-requests'] += len(requests)
+ if results:
+ self.db.insert_ingest_file_result(self.cur, results)
+ self.counts['insert-results'] += len(results)
+
+ # these schemas match, so can just pass through
+ # TODO: need to include warc_path etc in ingest-result
+ cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx') and r['cdx'].get('warc_path')]
+ if cdx_batch:
+ self.db.insert_cdx(self.cur, cdx_batch)
+ self.counts['insert-cdx'] += len(cdx_batch)
+ file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')]
+ if file_meta_batch:
+ self.db.insert_file_meta(self.cur, file_meta_batch)
+ self.counts['insert-file_meta'] += len(file_meta_batch)
+
+ self.db.commit()
+ return []
+
+
+class PersistGrobidWorker(SandcrawlerWorker):
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+ self.grobid = GrobidClient()
+
+ def process(self, record):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+
+ # enhance with teixml2json metadata, if available
+ for r in batch:
+ if r['status_code'] != 200 or not r.get('tei_xml'):
+ continue
+ metadata = self.grobid.metadata(r)
+ if not metadata:
+ continue
+ for k in ('fatcat_release', 'grobid_version'):
+ r[k] = metadata.pop(k)
+ if r.get('fatcat_release'):
+ r['fatcat_release'] = r['fatcat_release'].replace('release_', '')
+ r['metadata'] = metadata
+
+ grobid_batch = [r['grobid'] for r in batch if r.get('grobid')]
+ self.db.insert_grobid(self.cur, batch)
+
+ file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')]
+ self.db.insert_file_meta(self.cur, file_meta_batch)
+
+ # TODO: minio, grobid
+
+ self.db.commit()
+ return []
+