diff options
-rwxr-xr-x | python/persist_tool.py | 98 | ||||
-rw-r--r-- | python/sandcrawler/persist.py | 223 | ||||
-rwxr-xr-x | python/sandcrawler_worker.py | 20 |
3 files changed, 336 insertions, 5 deletions
diff --git a/python/persist_tool.py b/python/persist_tool.py new file mode 100755 index 0000000..d65fa53 --- /dev/null +++ b/python/persist_tool.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python3 + +""" +Commands for backfilling content from bulk files into postgresql and minio. + +Normally this is done by workers (in sandcrawler_worker.py) consuming from +Kafka feeds, but sometimes we have bulk processing output we want to backfill. +""" + +import sys +import argparse +import datetime +import raven + +from sandcrawler import * +from sandcrawler.persist import * + + +def run_cdx(args): + worker = PersistCdxWorker( + db_url=args.db_url, + ) + filter_mimetypes = ['application/pdf'] + if args.no_mimetype_filter: + filter_mimetypes = None + pusher = CdxLinePusher( + worker, + args.cdx_file, + filter_http_statuses=[200], + filter_mimetypes=filter_mimetypes, + #allow_octet_stream + batch_size=200, + ) + pusher.run() + +def run_grobid(args): + worker = PersistGrobidWorker( + db_url=args.db_url, + ) + pusher = JsonLinePusher( + worker, + args.json_file, + batch_size=50, + ) + pusher.run() + +def run_ingest_file_result(args): + worker = PersistIngestFileResultWorker( + db_url=args.db_url, + ) + pusher = JsonLinePusher( + worker, + args.json_file, + batch_size=200, + ) + pusher.run() + +def main(): + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('--db-url', + help="postgresql database connection string", + default="postgres:///sandcrawler") + subparsers = parser.add_subparsers() + + sub_cdx = subparsers.add_parser('cdx', + help="backfill a CDX file into postgresql cdx table") + sub_cdx.set_defaults(func=run_cdx) + sub_cdx.add_argument('cdx_file', + help="CDX file to import from (or '-' for stdin)", + type=argparse.FileType('r')) + sub_cdx.add_argument('--no-mimetype-filter', + action='store_true', + help="ignore mimetype filtering; insert all content types (eg, assuming pre-filtered)") + + sub_grobid = subparsers.add_parser('grobid', + help="backfill a grobid JSON ('pg') dump into postgresql and minio") + sub_grobid.set_defaults(func=run_grobid) + sub_grobid.add_argument('json_file', + help="grobid file to import from (or '-' for stdin)", + type=argparse.FileType('r')) + + sub_ingest_file_result = subparsers.add_parser('ingest-file-result', + help="backfill a ingest_file_result JSON dump into postgresql") + sub_ingest_file_result.set_defaults(func=run_ingest_file_result) + sub_ingest_file_result.add_argument('json_file', + help="ingest_file_result file to import from (or '-' for stdin)", + type=argparse.FileType('r')) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("Tell me what to do!", file=sys.stderr) + sys.exit(-1) + + args.func(args) + +if __name__ == '__main__': + main() diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py new file mode 100644 index 0000000..07e6c83 --- /dev/null +++ b/python/sandcrawler/persist.py @@ -0,0 +1,223 @@ + +""" +cdx +- read raw CDX, filter +- push to SQL table + +ingest-file-result +- read JSON format (batch) +- cdx SQL push batch (on conflict skip) +- file_meta SQL push batch (on conflict update) +- ingest request push batch (on conflict skip) +- ingest result push batch (on conflict update) + +grobid +- reads JSON format (batch) +- grobid2json +- minio push (one-by-one) +- grobid SQL push batch (on conflict update) +- file_meta SQL push batch (on conflict update) +""" + +from sandcrawler.workers import SandcrawlerWorker +from sandcrawler.db import SandcrawlerPostgresClient +from sandcrawler.minio import SandcrawlerMinioClient +from sandcrawler.grobid import GrobidClient + + +class PersistCdxWorker(SandcrawlerWorker): + + def __init__(self, db_url, **kwargs): + super().__init__() + self.db = SandcrawlerPostgresClient(db_url) + self.cur = self.db.conn.cursor() + + def process(self, record): + """ + Only do batches (as transactions) + """ + raise NotImplementedError + + def push_batch(self, batch): + self.counts['total'] += len(batch) + self.db.insert_cdx(self.cur, batch) + self.counts['insert-cdx'] += len(batch) + self.db.commit() + return [] + +class PersistIngestFileResultWorker(SandcrawlerWorker): + + def __init__(self, db_url, **kwargs): + super().__init__() + self.db = SandcrawlerPostgresClient(db_url) + self.cur = self.db.conn.cursor() + + def process(self, record): + """ + Only do batches (as transactions) + """ + raise NotImplementedError + + def request_to_row(self, raw): + """ + Converts ingest-request JSON schema (eg, from Kafka) to SQL ingest_request schema + + if there is a problem with conversion, return None + """ + # backwards compat hacks; transform request to look like current schema + if raw.get('ingest_type') == 'file': + raw['ingest_type'] = 'pdf' + if (not raw.get('link_source') + and raw.get('base_url') + and raw.get('ext_ids', {}).get('doi') + and raw['base_url'] == "https://doi.org/{}".format(raw['ext_ids']['doi'])): + # set link_source(_id) for old ingest requests + raw['link_source'] = 'doi' + raw['link_source_id'] = raw['ext_ids']['doi'] + if (not raw.get('link_source') + and raw.get('ingest_request_source', '').startswith('savepapernow') + and raw.get('fatcat', {}).get('release_ident')): + # set link_source(_id) for old ingest requests + raw['link_source'] = 'spn' + raw['link_source_id'] = raw['fatcat']['release_ident'] + + for k in ('ingest_type', 'base_url', 'link_source', 'link_source_id'): + if not k in raw: + self.counts['skip-fields'] += 1 + return None + if raw['ingest_type'] not in ('pdf', 'xml'): + print(raw['ingest_type']) + self.counts['skip-ingest-type'] += 1 + return None + request = { + 'ingest_type': raw['ingest_type'], + 'base_url': raw['base_url'], + 'link_source': raw['link_source'], + 'link_source_id': raw['link_source_id'], + 'request': {}, + } + # extra/optional fields + if raw.get('release_stage'): + request['release_stage'] = raw['release_stage'] + if raw.get('fatcat', {}).get('release_ident'): + request['request']['release_ident'] = raw['fatcat']['release_ident'] + for k in ('ext_ids', 'edit_extra'): + if raw.get(k): + request['request'][k] = raw[k] + # if this dict is empty, trim it to save DB space + if not request['request']: + request['request'] = None + return request + + + def file_result_to_row(self, raw): + """ + Converts ingest-result JSON schema (eg, from Kafka) to SQL ingest_file_result schema + + if there is a problem with conversion, return None and set skip count + """ + for k in ('request', 'hit', 'status'): + if not k in raw: + self.counts['skip-fields'] += 1 + return None + if not 'base_url' in raw['request']: + self.counts['skip-fields'] += 1 + return None + ingest_type = raw['request'].get('ingest_type') + if ingest_type == 'file': + ingest_type = 'pdf' + if ingest_type not in ('pdf', 'xml'): + self.counts['skip-ingest-type'] += 1 + return None + result = { + 'ingest_type': ingest_type, + 'base_url': raw['request']['base_url'], + 'hit': raw['hit'], + 'status': raw['status'], + } + terminal = raw.get('terminal') + if terminal: + result['terminal_url'] = terminal['url'] + if terminal.get('status_code') == None and terminal.get('http_status'): + terminal['status_code'] = terminal['http_status'] + result['terminal_status_code'] = int(terminal['status_code']) + if raw.get('file_meta'): + result['terminal_sha1hex'] = raw['file_meta']['sha1hex'] + if raw.get('cdx') and raw['cdx']['url'] == terminal['url']: + result['terminal_dt'] = raw['cdx']['datetime'] + return result + + def push_batch(self, batch): + self.counts['total'] += len(batch) + + if not batch: + return [] + + results = [self.file_result_to_row(raw) for raw in batch] + results = [r for r in results if r] + requests = [self.request_to_row(raw['request']) for raw in batch if raw.get('request')] + requests = [r for r in requests if r] + + if requests: + self.db.insert_ingest_request(self.cur, requests) + self.counts['insert-requests'] += len(requests) + if results: + self.db.insert_ingest_file_result(self.cur, results) + self.counts['insert-results'] += len(results) + + # these schemas match, so can just pass through + # TODO: need to include warc_path etc in ingest-result + cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx') and r['cdx'].get('warc_path')] + if cdx_batch: + self.db.insert_cdx(self.cur, cdx_batch) + self.counts['insert-cdx'] += len(cdx_batch) + file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')] + if file_meta_batch: + self.db.insert_file_meta(self.cur, file_meta_batch) + self.counts['insert-file_meta'] += len(file_meta_batch) + + self.db.commit() + return [] + + +class PersistGrobidWorker(SandcrawlerWorker): + + def __init__(self, db_url, **kwargs): + super().__init__() + self.db = SandcrawlerPostgresClient(db_url) + self.cur = self.db.conn.cursor() + self.grobid = GrobidClient() + + def process(self, record): + """ + Only do batches (as transactions) + """ + raise NotImplementedError + + def push_batch(self, batch): + self.counts['total'] += len(batch) + + # enhance with teixml2json metadata, if available + for r in batch: + if r['status_code'] != 200 or not r.get('tei_xml'): + continue + metadata = self.grobid.metadata(r) + if not metadata: + continue + for k in ('fatcat_release', 'grobid_version'): + r[k] = metadata.pop(k) + if r.get('fatcat_release'): + r['fatcat_release'] = r['fatcat_release'].replace('release_', '') + r['metadata'] = metadata + + grobid_batch = [r['grobid'] for r in batch if r.get('grobid')] + self.db.insert_grobid(self.cur, batch) + + file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')] + self.db.insert_file_meta(self.cur, file_meta_batch) + + # TODO: minio, grobid + + self.db.commit() + return [] + diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index f314218..895a5b9 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -30,11 +30,10 @@ def run_grobid_extract(args): def run_grobid_persist(args): consume_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) - raise NotImplementedError - #worker = GrobidPersistWorker() - #pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, - # consume_topic=consume_topic, group="grobid-persist") - #pusher.run() + worker = PersistGrobidWorker() + pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, group="grobid-persist") + pusher.run() def run_ingest_file(args): consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) @@ -46,6 +45,13 @@ def run_ingest_file(args): consume_topic=consume_topic, group="ingest-file", batch_size=1) pusher.run() +def run_ingest_file_persist(args): + consume_topic = "sandcrawler-{}.ingest-file-results".format(args.env) + worker = PersistIngestFileResultWorker() + pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, group="ingest-persist") + pusher.run() + def main(): parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -72,6 +78,10 @@ def main(): help="daemon that consumes requests from Kafka, ingests, pushes results to Kafka") sub_ingest_file.set_defaults(func=run_ingest_file) + sub_ingest_file_persist = subparsers.add_parser('ingest-file-persist', + help="daemon that consumes ingest-file output from Kafka and pushes to postgres") + sub_ingest_file_persist.set_defaults(func=run_ingest_file_persist) + args = parser.parse_args() if not args.__dict__.get("func"): print("tell me what to do!") |