aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xpython/persist_tool.py98
-rw-r--r--python/sandcrawler/persist.py223
-rwxr-xr-xpython/sandcrawler_worker.py20
3 files changed, 336 insertions, 5 deletions
diff --git a/python/persist_tool.py b/python/persist_tool.py
new file mode 100755
index 0000000..d65fa53
--- /dev/null
+++ b/python/persist_tool.py
@@ -0,0 +1,98 @@
+#!/usr/bin/env python3
+
+"""
+Commands for backfilling content from bulk files into postgresql and minio.
+
+Normally this is done by workers (in sandcrawler_worker.py) consuming from
+Kafka feeds, but sometimes we have bulk processing output we want to backfill.
+"""
+
+import sys
+import argparse
+import datetime
+import raven
+
+from sandcrawler import *
+from sandcrawler.persist import *
+
+
+def run_cdx(args):
+ worker = PersistCdxWorker(
+ db_url=args.db_url,
+ )
+ filter_mimetypes = ['application/pdf']
+ if args.no_mimetype_filter:
+ filter_mimetypes = None
+ pusher = CdxLinePusher(
+ worker,
+ args.cdx_file,
+ filter_http_statuses=[200],
+ filter_mimetypes=filter_mimetypes,
+ #allow_octet_stream
+ batch_size=200,
+ )
+ pusher.run()
+
+def run_grobid(args):
+ worker = PersistGrobidWorker(
+ db_url=args.db_url,
+ )
+ pusher = JsonLinePusher(
+ worker,
+ args.json_file,
+ batch_size=50,
+ )
+ pusher.run()
+
+def run_ingest_file_result(args):
+ worker = PersistIngestFileResultWorker(
+ db_url=args.db_url,
+ )
+ pusher = JsonLinePusher(
+ worker,
+ args.json_file,
+ batch_size=200,
+ )
+ pusher.run()
+
+def main():
+ parser = argparse.ArgumentParser(
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser.add_argument('--db-url',
+ help="postgresql database connection string",
+ default="postgres:///sandcrawler")
+ subparsers = parser.add_subparsers()
+
+ sub_cdx = subparsers.add_parser('cdx',
+ help="backfill a CDX file into postgresql cdx table")
+ sub_cdx.set_defaults(func=run_cdx)
+ sub_cdx.add_argument('cdx_file',
+ help="CDX file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
+ sub_cdx.add_argument('--no-mimetype-filter',
+ action='store_true',
+ help="ignore mimetype filtering; insert all content types (eg, assuming pre-filtered)")
+
+ sub_grobid = subparsers.add_parser('grobid',
+ help="backfill a grobid JSON ('pg') dump into postgresql and minio")
+ sub_grobid.set_defaults(func=run_grobid)
+ sub_grobid.add_argument('json_file',
+ help="grobid file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
+
+ sub_ingest_file_result = subparsers.add_parser('ingest-file-result',
+ help="backfill a ingest_file_result JSON dump into postgresql")
+ sub_ingest_file_result.set_defaults(func=run_ingest_file_result)
+ sub_ingest_file_result.add_argument('json_file',
+ help="ingest_file_result file to import from (or '-' for stdin)",
+ type=argparse.FileType('r'))
+
+ args = parser.parse_args()
+ if not args.__dict__.get("func"):
+ print("Tell me what to do!", file=sys.stderr)
+ sys.exit(-1)
+
+ args.func(args)
+
+if __name__ == '__main__':
+ main()
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
new file mode 100644
index 0000000..07e6c83
--- /dev/null
+++ b/python/sandcrawler/persist.py
@@ -0,0 +1,223 @@
+
+"""
+cdx
+- read raw CDX, filter
+- push to SQL table
+
+ingest-file-result
+- read JSON format (batch)
+- cdx SQL push batch (on conflict skip)
+- file_meta SQL push batch (on conflict update)
+- ingest request push batch (on conflict skip)
+- ingest result push batch (on conflict update)
+
+grobid
+- reads JSON format (batch)
+- grobid2json
+- minio push (one-by-one)
+- grobid SQL push batch (on conflict update)
+- file_meta SQL push batch (on conflict update)
+"""
+
+from sandcrawler.workers import SandcrawlerWorker
+from sandcrawler.db import SandcrawlerPostgresClient
+from sandcrawler.minio import SandcrawlerMinioClient
+from sandcrawler.grobid import GrobidClient
+
+
+class PersistCdxWorker(SandcrawlerWorker):
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+
+ def process(self, record):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+ self.db.insert_cdx(self.cur, batch)
+ self.counts['insert-cdx'] += len(batch)
+ self.db.commit()
+ return []
+
+class PersistIngestFileResultWorker(SandcrawlerWorker):
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+
+ def process(self, record):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def request_to_row(self, raw):
+ """
+ Converts ingest-request JSON schema (eg, from Kafka) to SQL ingest_request schema
+
+ if there is a problem with conversion, return None
+ """
+ # backwards compat hacks; transform request to look like current schema
+ if raw.get('ingest_type') == 'file':
+ raw['ingest_type'] = 'pdf'
+ if (not raw.get('link_source')
+ and raw.get('base_url')
+ and raw.get('ext_ids', {}).get('doi')
+ and raw['base_url'] == "https://doi.org/{}".format(raw['ext_ids']['doi'])):
+ # set link_source(_id) for old ingest requests
+ raw['link_source'] = 'doi'
+ raw['link_source_id'] = raw['ext_ids']['doi']
+ if (not raw.get('link_source')
+ and raw.get('ingest_request_source', '').startswith('savepapernow')
+ and raw.get('fatcat', {}).get('release_ident')):
+ # set link_source(_id) for old ingest requests
+ raw['link_source'] = 'spn'
+ raw['link_source_id'] = raw['fatcat']['release_ident']
+
+ for k in ('ingest_type', 'base_url', 'link_source', 'link_source_id'):
+ if not k in raw:
+ self.counts['skip-fields'] += 1
+ return None
+ if raw['ingest_type'] not in ('pdf', 'xml'):
+ print(raw['ingest_type'])
+ self.counts['skip-ingest-type'] += 1
+ return None
+ request = {
+ 'ingest_type': raw['ingest_type'],
+ 'base_url': raw['base_url'],
+ 'link_source': raw['link_source'],
+ 'link_source_id': raw['link_source_id'],
+ 'request': {},
+ }
+ # extra/optional fields
+ if raw.get('release_stage'):
+ request['release_stage'] = raw['release_stage']
+ if raw.get('fatcat', {}).get('release_ident'):
+ request['request']['release_ident'] = raw['fatcat']['release_ident']
+ for k in ('ext_ids', 'edit_extra'):
+ if raw.get(k):
+ request['request'][k] = raw[k]
+ # if this dict is empty, trim it to save DB space
+ if not request['request']:
+ request['request'] = None
+ return request
+
+
+ def file_result_to_row(self, raw):
+ """
+ Converts ingest-result JSON schema (eg, from Kafka) to SQL ingest_file_result schema
+
+ if there is a problem with conversion, return None and set skip count
+ """
+ for k in ('request', 'hit', 'status'):
+ if not k in raw:
+ self.counts['skip-fields'] += 1
+ return None
+ if not 'base_url' in raw['request']:
+ self.counts['skip-fields'] += 1
+ return None
+ ingest_type = raw['request'].get('ingest_type')
+ if ingest_type == 'file':
+ ingest_type = 'pdf'
+ if ingest_type not in ('pdf', 'xml'):
+ self.counts['skip-ingest-type'] += 1
+ return None
+ result = {
+ 'ingest_type': ingest_type,
+ 'base_url': raw['request']['base_url'],
+ 'hit': raw['hit'],
+ 'status': raw['status'],
+ }
+ terminal = raw.get('terminal')
+ if terminal:
+ result['terminal_url'] = terminal['url']
+ if terminal.get('status_code') == None and terminal.get('http_status'):
+ terminal['status_code'] = terminal['http_status']
+ result['terminal_status_code'] = int(terminal['status_code'])
+ if raw.get('file_meta'):
+ result['terminal_sha1hex'] = raw['file_meta']['sha1hex']
+ if raw.get('cdx') and raw['cdx']['url'] == terminal['url']:
+ result['terminal_dt'] = raw['cdx']['datetime']
+ return result
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+
+ if not batch:
+ return []
+
+ results = [self.file_result_to_row(raw) for raw in batch]
+ results = [r for r in results if r]
+ requests = [self.request_to_row(raw['request']) for raw in batch if raw.get('request')]
+ requests = [r for r in requests if r]
+
+ if requests:
+ self.db.insert_ingest_request(self.cur, requests)
+ self.counts['insert-requests'] += len(requests)
+ if results:
+ self.db.insert_ingest_file_result(self.cur, results)
+ self.counts['insert-results'] += len(results)
+
+ # these schemas match, so can just pass through
+ # TODO: need to include warc_path etc in ingest-result
+ cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx') and r['cdx'].get('warc_path')]
+ if cdx_batch:
+ self.db.insert_cdx(self.cur, cdx_batch)
+ self.counts['insert-cdx'] += len(cdx_batch)
+ file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')]
+ if file_meta_batch:
+ self.db.insert_file_meta(self.cur, file_meta_batch)
+ self.counts['insert-file_meta'] += len(file_meta_batch)
+
+ self.db.commit()
+ return []
+
+
+class PersistGrobidWorker(SandcrawlerWorker):
+
+ def __init__(self, db_url, **kwargs):
+ super().__init__()
+ self.db = SandcrawlerPostgresClient(db_url)
+ self.cur = self.db.conn.cursor()
+ self.grobid = GrobidClient()
+
+ def process(self, record):
+ """
+ Only do batches (as transactions)
+ """
+ raise NotImplementedError
+
+ def push_batch(self, batch):
+ self.counts['total'] += len(batch)
+
+ # enhance with teixml2json metadata, if available
+ for r in batch:
+ if r['status_code'] != 200 or not r.get('tei_xml'):
+ continue
+ metadata = self.grobid.metadata(r)
+ if not metadata:
+ continue
+ for k in ('fatcat_release', 'grobid_version'):
+ r[k] = metadata.pop(k)
+ if r.get('fatcat_release'):
+ r['fatcat_release'] = r['fatcat_release'].replace('release_', '')
+ r['metadata'] = metadata
+
+ grobid_batch = [r['grobid'] for r in batch if r.get('grobid')]
+ self.db.insert_grobid(self.cur, batch)
+
+ file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')]
+ self.db.insert_file_meta(self.cur, file_meta_batch)
+
+ # TODO: minio, grobid
+
+ self.db.commit()
+ return []
+
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py
index f314218..895a5b9 100755
--- a/python/sandcrawler_worker.py
+++ b/python/sandcrawler_worker.py
@@ -30,11 +30,10 @@ def run_grobid_extract(args):
def run_grobid_persist(args):
consume_topic = "sandcrawler-{}.grobid-output-pg".format(args.env)
- raise NotImplementedError
- #worker = GrobidPersistWorker()
- #pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts,
- # consume_topic=consume_topic, group="grobid-persist")
- #pusher.run()
+ worker = PersistGrobidWorker()
+ pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts,
+ consume_topic=consume_topic, group="grobid-persist")
+ pusher.run()
def run_ingest_file(args):
consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env)
@@ -46,6 +45,13 @@ def run_ingest_file(args):
consume_topic=consume_topic, group="ingest-file", batch_size=1)
pusher.run()
+def run_ingest_file_persist(args):
+ consume_topic = "sandcrawler-{}.ingest-file-results".format(args.env)
+ worker = PersistIngestFileResultWorker()
+ pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts,
+ consume_topic=consume_topic, group="ingest-persist")
+ pusher.run()
+
def main():
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
@@ -72,6 +78,10 @@ def main():
help="daemon that consumes requests from Kafka, ingests, pushes results to Kafka")
sub_ingest_file.set_defaults(func=run_ingest_file)
+ sub_ingest_file_persist = subparsers.add_parser('ingest-file-persist',
+ help="daemon that consumes ingest-file output from Kafka and pushes to postgres")
+ sub_ingest_file_persist.set_defaults(func=run_ingest_file_persist)
+
args = parser.parse_args()
if not args.__dict__.get("func"):
print("tell me what to do!")