From acea0838caa93f194caa380a6211bf57cc8fc5bf Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 27 Dec 2019 16:35:56 -0800 Subject: update persist worker invocation to use batches --- python/sandcrawler_worker.py | 70 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 55 insertions(+), 15 deletions(-) (limited to 'python') diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index c926911..26735ce 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -21,12 +21,27 @@ sentry_client = raven.Client() def run_grobid_extract(args): consume_topic = "sandcrawler-{}.ungrobided-pg".format(args.env) produce_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) - sink = KafkaSink(kafka_hosts=args.kafka_hosts, produce_topic=produce_topic) - grobid_client = GrobidClient(host_url=args.grobid_host) - wayback_client = WaybackClient(host_url=args.grobid_host) - worker = GrobidWorker(grobid_client=grobid_client, wayback_client=wayback_client, sink=sink) - pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, - consume_topic=consume_topic, group="grobid-extract") + sink = KafkaSink( + kafka_hosts=args.kafka_hosts, + produce_topic=produce_topic, + ) + grobid_client = GrobidClient( + host_url=args.grobid_host, + ) + wayback_client = WaybackClient( + host_url=args.grobid_host, + ) + worker = GrobidWorker( + grobid_client=grobid_client, + wayback_client=wayback_client, + sink=sink, + ) + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="grobid-extract", + ) pusher.run() def run_persist_grobid(args): @@ -39,18 +54,37 @@ def run_persist_grobid(args): s3_secret_key=args.s3_secret_key, s3_only=args.s3_only, ) - pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, - consume_topic=consume_topic, group="grobid-persist") + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="persist-grobid", + push_batches=True, + batch_size=25, + ) pusher.run() def run_ingest_file(args): consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) produce_topic = "sandcrawler-{}.ingest-file-results".format(args.env) - sink = KafkaSink(kafka_hosts=args.kafka_hosts, produce_topic=produce_topic) - grobid_client = GrobidClient(host_url=args.grobid_host) - worker = IngestFileWorker(grobid_client=grobid_client, sink=sink) - pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, - consume_topic=consume_topic, group="ingest-file", batch_size=1) + sink = KafkaSink( + kafka_hosts=args.kafka_hosts, + produce_topic=produce_topic, + ) + grobid_client = GrobidClient( + host_url=args.grobid_host, + ) + worker = IngestFileWorker( + grobid_client=grobid_client, + sink=sink, + ) + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="ingest-file", + batch_size=1, + ) pusher.run() def run_persist_ingest_file(args): @@ -58,8 +92,14 @@ def run_persist_ingest_file(args): worker = PersistIngestFileResultWorker( db_url=args.db_url, ) - pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, - consume_topic=consume_topic, group="ingest-persist") + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="persist-ingest", + push_batches=True, + batch_size=100, + ) pusher.run() def main(): -- cgit v1.2.3