From d4f40c4401e479f6db366ea104687938b9d2345e Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 26 Dec 2019 21:32:56 -0800 Subject: fix sandcrawler persist workers --- python/sandcrawler/__init__.py | 1 + python/sandcrawler_worker.py | 44 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/python/sandcrawler/__init__.py b/python/sandcrawler/__init__.py index c9cc0c9..236570e 100644 --- a/python/sandcrawler/__init__.py +++ b/python/sandcrawler/__init__.py @@ -4,4 +4,5 @@ from .misc import gen_file_metadata, b32_hex, parse_cdx_line, parse_cdx_datetime from .workers import KafkaSink, KafkaGrobidSink, JsonLinePusher, CdxLinePusher, CdxLinePusher, KafkaJsonPusher, BlackholeSink, ZipfilePusher, MultiprocessWrapper from .ia import WaybackClient, WaybackError, CdxApiClient, CdxApiError, SavePageNowClient, SavePageNowError, SavePageNowRemoteError from .ingest import IngestFileWorker +from .persist import PersistCdxWorker, PersistIngestFileResultWorker, PersistGrobidWorker, PersistGrobidDiskWorker diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index 895a5b9..c926911 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -6,6 +6,7 @@ Outputs might either be pushed back into Kafka, or directly into sandcrawler-db or minio. """ +import os import sys import argparse import datetime @@ -28,9 +29,16 @@ def run_grobid_extract(args): consume_topic=consume_topic, group="grobid-extract") pusher.run() -def run_grobid_persist(args): +def run_persist_grobid(args): consume_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) - worker = PersistGrobidWorker() + worker = PersistGrobidWorker( + db_url=args.db_url, + s3_url=args.s3_url, + s3_bucket=args.s3_bucket, + s3_access_key=args.s3_access_key, + s3_secret_key=args.s3_secret_key, + s3_only=args.s3_only, + ) pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, consume_topic=consume_topic, group="grobid-persist") pusher.run() @@ -45,9 +53,11 @@ def run_ingest_file(args): consume_topic=consume_topic, group="ingest-file", batch_size=1) pusher.run() -def run_ingest_file_persist(args): +def run_persist_ingest_file(args): consume_topic = "sandcrawler-{}.ingest-file-results".format(args.env) - worker = PersistIngestFileResultWorker() + worker = PersistIngestFileResultWorker( + db_url=args.db_url, + ) pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, consume_topic=consume_topic, group="ingest-persist") pusher.run() @@ -64,23 +74,41 @@ def main(): parser.add_argument('--grobid-host', default="http://grobid.qa.fatcat.wiki", help="GROBID API host/port") + parser.add_argument('--db-url', + help="postgresql database connection string", + default="postgres:///sandcrawler") + parser.add_argument('--s3-url', + help="S3 (minio) backend URL", + default="localhost:9000") + parser.add_argument('--s3-access-key', + help="S3 (minio) credential", + default=os.environ.get('MINIO_ACCESS_KEY')) + parser.add_argument('--s3-secret-key', + help="S3 (minio) credential", + default=os.environ.get('MINIO_SECRET_KEY')) + parser.add_argument('--s3-bucket', + help="S3 (minio) bucket to persist into", + default="sandcrawler-dev") subparsers = parser.add_subparsers() sub_grobid_extract = subparsers.add_parser('grobid-extract', help="daemon that consumes CDX JSON objects from Kafka, extracts, pushes to Kafka") sub_grobid_extract.set_defaults(func=run_grobid_extract) - sub_grobid_persist = subparsers.add_parser('grobid-persist', + sub_persist_grobid = subparsers.add_parser('persist-grobid', help="daemon that consumes GROBID output from Kafka and pushes to minio and postgres") - sub_grobid_persist.set_defaults(func=run_grobid_persist) + sub_persist_grobid.add_argument('--s3-only', + action='store_true', + help="only upload TEI-XML to S3 (don't write to database)") + sub_persist_grobid.set_defaults(func=run_persist_grobid) sub_ingest_file = subparsers.add_parser('ingest-file', help="daemon that consumes requests from Kafka, ingests, pushes results to Kafka") sub_ingest_file.set_defaults(func=run_ingest_file) - sub_ingest_file_persist = subparsers.add_parser('ingest-file-persist', + sub_persist_ingest_file = subparsers.add_parser('persist-ingest-file', help="daemon that consumes ingest-file output from Kafka and pushes to postgres") - sub_ingest_file_persist.set_defaults(func=run_ingest_file_persist) + sub_persist_ingest_file.set_defaults(func=run_persist_ingest_file) args = parser.parse_args() if not args.__dict__.get("func"): -- cgit v1.2.3