diff options
| author | Bryan Newbold <bnewbold@archive.org> | 2019-12-24 16:39:14 -0800 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@archive.org> | 2020-01-02 18:12:58 -0800 | 
| commit | 460843e31ebea16fcb543b8448365cfe004103b0 (patch) | |
| tree | f5576149afa91b6807decb048e2d5ba461a2a6da /python | |
| parent | 4a3eade72ea557eb5e04c59c454887d20c718314 (diff) | |
| download | sandcrawler-460843e31ebea16fcb543b8448365cfe004103b0.tar.gz sandcrawler-460843e31ebea16fcb543b8448365cfe004103b0.zip  | |
start work on persist workers and tool
Diffstat (limited to 'python')
| -rwxr-xr-x | python/persist_tool.py | 98 | ||||
| -rw-r--r-- | python/sandcrawler/persist.py | 223 | ||||
| -rwxr-xr-x | python/sandcrawler_worker.py | 20 | 
3 files changed, 336 insertions, 5 deletions
diff --git a/python/persist_tool.py b/python/persist_tool.py new file mode 100755 index 0000000..d65fa53 --- /dev/null +++ b/python/persist_tool.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python3 + +""" +Commands for backfilling content from bulk files into postgresql and minio. + +Normally this is done by workers (in sandcrawler_worker.py) consuming from +Kafka feeds, but sometimes we have bulk processing output we want to backfill. +""" + +import sys +import argparse +import datetime +import raven + +from sandcrawler import * +from sandcrawler.persist import * + + +def run_cdx(args): +    worker = PersistCdxWorker( +        db_url=args.db_url, +    ) +    filter_mimetypes = ['application/pdf'] +    if args.no_mimetype_filter: +        filter_mimetypes = None +    pusher = CdxLinePusher( +        worker, +        args.cdx_file, +        filter_http_statuses=[200], +        filter_mimetypes=filter_mimetypes, +        #allow_octet_stream +        batch_size=200, +    ) +    pusher.run() + +def run_grobid(args): +    worker = PersistGrobidWorker( +        db_url=args.db_url, +    ) +    pusher = JsonLinePusher( +        worker, +        args.json_file, +        batch_size=50, +    ) +    pusher.run() + +def run_ingest_file_result(args): +    worker = PersistIngestFileResultWorker( +        db_url=args.db_url, +    ) +    pusher = JsonLinePusher( +        worker, +        args.json_file, +        batch_size=200, +    ) +    pusher.run() + +def main(): +    parser = argparse.ArgumentParser( +        formatter_class=argparse.ArgumentDefaultsHelpFormatter) +    parser.add_argument('--db-url', +        help="postgresql database connection string", +        default="postgres:///sandcrawler") +    subparsers = parser.add_subparsers() + +    sub_cdx = subparsers.add_parser('cdx', +        help="backfill a CDX file into postgresql cdx table") +    sub_cdx.set_defaults(func=run_cdx) +    sub_cdx.add_argument('cdx_file', +        help="CDX file to import from (or '-' for stdin)", +        type=argparse.FileType('r')) +    sub_cdx.add_argument('--no-mimetype-filter', +        action='store_true', +        help="ignore mimetype filtering; insert all content types (eg, assuming pre-filtered)") + +    sub_grobid = subparsers.add_parser('grobid', +        help="backfill a grobid JSON ('pg') dump into postgresql and minio") +    sub_grobid.set_defaults(func=run_grobid) +    sub_grobid.add_argument('json_file', +        help="grobid file to import from (or '-' for stdin)", +        type=argparse.FileType('r')) + +    sub_ingest_file_result = subparsers.add_parser('ingest-file-result', +        help="backfill a ingest_file_result JSON dump into postgresql") +    sub_ingest_file_result.set_defaults(func=run_ingest_file_result) +    sub_ingest_file_result.add_argument('json_file', +        help="ingest_file_result file to import from (or '-' for stdin)", +        type=argparse.FileType('r')) + +    args = parser.parse_args() +    if not args.__dict__.get("func"): +        print("Tell me what to do!", file=sys.stderr) +        sys.exit(-1) + +    args.func(args) + +if __name__ == '__main__': +    main() diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py new file mode 100644 index 0000000..07e6c83 --- /dev/null +++ b/python/sandcrawler/persist.py @@ -0,0 +1,223 @@ + +""" +cdx +- read raw CDX, filter +- push to SQL table + +ingest-file-result +- read JSON format (batch) +- cdx SQL push batch (on conflict skip) +- file_meta SQL push batch (on conflict update) +- ingest request push batch (on conflict skip) +- ingest result push batch (on conflict update) + +grobid +- reads JSON format (batch) +- grobid2json +- minio push (one-by-one) +- grobid SQL push batch (on conflict update) +- file_meta SQL push batch (on conflict update) +""" + +from sandcrawler.workers import SandcrawlerWorker +from sandcrawler.db import SandcrawlerPostgresClient +from sandcrawler.minio import SandcrawlerMinioClient +from sandcrawler.grobid import GrobidClient + + +class PersistCdxWorker(SandcrawlerWorker): + +    def __init__(self, db_url, **kwargs): +        super().__init__() +        self.db = SandcrawlerPostgresClient(db_url) +        self.cur = self.db.conn.cursor() + +    def process(self, record): +        """ +        Only do batches (as transactions) +        """ +        raise NotImplementedError + +    def push_batch(self, batch): +        self.counts['total'] += len(batch) +        self.db.insert_cdx(self.cur, batch) +        self.counts['insert-cdx'] += len(batch) +        self.db.commit() +        return [] + +class PersistIngestFileResultWorker(SandcrawlerWorker): + +    def __init__(self, db_url, **kwargs): +        super().__init__() +        self.db = SandcrawlerPostgresClient(db_url) +        self.cur = self.db.conn.cursor() + +    def process(self, record): +        """ +        Only do batches (as transactions) +        """ +        raise NotImplementedError + +    def request_to_row(self, raw): +        """ +        Converts ingest-request JSON schema (eg, from Kafka) to SQL ingest_request schema + +        if there is a problem with conversion, return None +        """ +        # backwards compat hacks; transform request to look like current schema +        if raw.get('ingest_type') == 'file': +            raw['ingest_type'] = 'pdf' +        if (not raw.get('link_source') +                and raw.get('base_url') +                and raw.get('ext_ids', {}).get('doi') +                and raw['base_url'] == "https://doi.org/{}".format(raw['ext_ids']['doi'])): +            # set link_source(_id) for old ingest requests +            raw['link_source'] = 'doi' +            raw['link_source_id'] = raw['ext_ids']['doi'] +        if (not raw.get('link_source') +                and raw.get('ingest_request_source', '').startswith('savepapernow') +                and raw.get('fatcat', {}).get('release_ident')): +            # set link_source(_id) for old ingest requests +            raw['link_source'] = 'spn' +            raw['link_source_id'] = raw['fatcat']['release_ident'] + +        for k in ('ingest_type', 'base_url', 'link_source', 'link_source_id'): +            if not k in raw: +                self.counts['skip-fields'] += 1 +                return None +        if raw['ingest_type'] not in ('pdf', 'xml'): +            print(raw['ingest_type']) +            self.counts['skip-ingest-type'] += 1 +            return None +        request = { +            'ingest_type': raw['ingest_type'], +            'base_url': raw['base_url'], +            'link_source': raw['link_source'], +            'link_source_id': raw['link_source_id'], +            'request': {}, +        } +        # extra/optional fields +        if raw.get('release_stage'): +            request['release_stage'] = raw['release_stage'] +        if raw.get('fatcat', {}).get('release_ident'): +            request['request']['release_ident'] = raw['fatcat']['release_ident'] +        for k in ('ext_ids', 'edit_extra'): +            if raw.get(k): +                request['request'][k] = raw[k] +        # if this dict is empty, trim it to save DB space +        if not request['request']: +            request['request'] = None +        return request +             + +    def file_result_to_row(self, raw): +        """ +        Converts ingest-result JSON schema (eg, from Kafka) to SQL ingest_file_result schema + +        if there is a problem with conversion, return None and set skip count +        """ +        for k in ('request', 'hit', 'status'): +            if not k in raw: +                self.counts['skip-fields'] += 1 +                return None +        if not 'base_url' in raw['request']: +            self.counts['skip-fields'] += 1 +            return None +        ingest_type = raw['request'].get('ingest_type') +        if ingest_type == 'file': +            ingest_type = 'pdf' +        if ingest_type not in ('pdf', 'xml'): +            self.counts['skip-ingest-type'] += 1 +            return None +        result = { +            'ingest_type': ingest_type, +            'base_url': raw['request']['base_url'], +            'hit': raw['hit'], +            'status': raw['status'], +        } +        terminal = raw.get('terminal') +        if terminal: +            result['terminal_url'] = terminal['url'] +            if terminal.get('status_code') == None and terminal.get('http_status'): +                terminal['status_code'] = terminal['http_status'] +            result['terminal_status_code'] = int(terminal['status_code']) +            if raw.get('file_meta'): +                result['terminal_sha1hex'] = raw['file_meta']['sha1hex'] +            if raw.get('cdx') and raw['cdx']['url'] == terminal['url']: +                result['terminal_dt'] = raw['cdx']['datetime'] +        return result + +    def push_batch(self, batch): +        self.counts['total'] += len(batch) + +        if not batch: +            return [] + +        results = [self.file_result_to_row(raw) for raw in batch] +        results = [r for r in results if r] +        requests = [self.request_to_row(raw['request']) for raw in batch if raw.get('request')] +        requests = [r for r in requests if r] + +        if requests: +            self.db.insert_ingest_request(self.cur, requests) +            self.counts['insert-requests'] += len(requests) +        if results: +            self.db.insert_ingest_file_result(self.cur, results) +            self.counts['insert-results'] += len(results) + +        # these schemas match, so can just pass through +        # TODO: need to include warc_path etc in ingest-result +        cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx') and r['cdx'].get('warc_path')] +        if cdx_batch: +            self.db.insert_cdx(self.cur, cdx_batch) +            self.counts['insert-cdx'] += len(cdx_batch) +        file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')] +        if file_meta_batch: +            self.db.insert_file_meta(self.cur, file_meta_batch) +            self.counts['insert-file_meta'] += len(file_meta_batch) + +        self.db.commit() +        return [] + + +class PersistGrobidWorker(SandcrawlerWorker): + +    def __init__(self, db_url, **kwargs): +        super().__init__() +        self.db = SandcrawlerPostgresClient(db_url) +        self.cur = self.db.conn.cursor() +        self.grobid = GrobidClient() + +    def process(self, record): +        """ +        Only do batches (as transactions) +        """ +        raise NotImplementedError + +    def push_batch(self, batch): +        self.counts['total'] += len(batch) + +        # enhance with teixml2json metadata, if available +        for r in batch: +            if r['status_code'] != 200 or not r.get('tei_xml'): +                continue +            metadata = self.grobid.metadata(r) +            if not metadata: +                continue +            for k in ('fatcat_release', 'grobid_version'): +                r[k] = metadata.pop(k) +            if r.get('fatcat_release'): +                r['fatcat_release'] = r['fatcat_release'].replace('release_', '') +            r['metadata'] = metadata + +        grobid_batch = [r['grobid'] for r in batch if r.get('grobid')] +        self.db.insert_grobid(self.cur, batch) + +        file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')] +        self.db.insert_file_meta(self.cur, file_meta_batch) + +        # TODO: minio, grobid + +        self.db.commit() +        return [] + diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index f314218..895a5b9 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -30,11 +30,10 @@ def run_grobid_extract(args):  def run_grobid_persist(args):      consume_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) -    raise NotImplementedError -    #worker = GrobidPersistWorker() -    #pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, -    #    consume_topic=consume_topic, group="grobid-persist") -    #pusher.run() +    worker = PersistGrobidWorker() +    pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, +        consume_topic=consume_topic, group="grobid-persist") +    pusher.run()  def run_ingest_file(args):      consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) @@ -46,6 +45,13 @@ def run_ingest_file(args):          consume_topic=consume_topic, group="ingest-file", batch_size=1)      pusher.run() +def run_ingest_file_persist(args): +    consume_topic = "sandcrawler-{}.ingest-file-results".format(args.env) +    worker = PersistIngestFileResultWorker() +    pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, +        consume_topic=consume_topic, group="ingest-persist") +    pusher.run() +  def main():      parser = argparse.ArgumentParser(          formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -72,6 +78,10 @@ def main():          help="daemon that consumes requests from Kafka, ingests, pushes results to Kafka")      sub_ingest_file.set_defaults(func=run_ingest_file) +    sub_ingest_file_persist = subparsers.add_parser('ingest-file-persist', +        help="daemon that consumes ingest-file output from Kafka and pushes to postgres") +    sub_ingest_file_persist.set_defaults(func=run_ingest_file_persist) +      args = parser.parse_args()      if not args.__dict__.get("func"):          print("tell me what to do!")  | 
