diff options
author | Bryan Newbold <bnewbold@archive.org> | 2019-12-27 16:35:56 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2020-01-02 18:12:58 -0800 |
commit | acea0838caa93f194caa380a6211bf57cc8fc5bf (patch) | |
tree | 71bd454aaa0aa606f956fdbb049085ed8b062cee /python | |
parent | 24df2167b294973ce230199370b7b061e6ae4498 (diff) | |
download | sandcrawler-acea0838caa93f194caa380a6211bf57cc8fc5bf.tar.gz sandcrawler-acea0838caa93f194caa380a6211bf57cc8fc5bf.zip |
update persist worker invocation to use batches
Diffstat (limited to 'python')
-rwxr-xr-x | python/sandcrawler_worker.py | 70 |
1 files changed, 55 insertions, 15 deletions
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index c926911..26735ce 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -21,12 +21,27 @@ sentry_client = raven.Client() def run_grobid_extract(args): consume_topic = "sandcrawler-{}.ungrobided-pg".format(args.env) produce_topic = "sandcrawler-{}.grobid-output-pg".format(args.env) - sink = KafkaSink(kafka_hosts=args.kafka_hosts, produce_topic=produce_topic) - grobid_client = GrobidClient(host_url=args.grobid_host) - wayback_client = WaybackClient(host_url=args.grobid_host) - worker = GrobidWorker(grobid_client=grobid_client, wayback_client=wayback_client, sink=sink) - pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, - consume_topic=consume_topic, group="grobid-extract") + sink = KafkaSink( + kafka_hosts=args.kafka_hosts, + produce_topic=produce_topic, + ) + grobid_client = GrobidClient( + host_url=args.grobid_host, + ) + wayback_client = WaybackClient( + host_url=args.grobid_host, + ) + worker = GrobidWorker( + grobid_client=grobid_client, + wayback_client=wayback_client, + sink=sink, + ) + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="grobid-extract", + ) pusher.run() def run_persist_grobid(args): @@ -39,18 +54,37 @@ def run_persist_grobid(args): s3_secret_key=args.s3_secret_key, s3_only=args.s3_only, ) - pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, - consume_topic=consume_topic, group="grobid-persist") + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="persist-grobid", + push_batches=True, + batch_size=25, + ) pusher.run() def run_ingest_file(args): consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env) produce_topic = "sandcrawler-{}.ingest-file-results".format(args.env) - sink = KafkaSink(kafka_hosts=args.kafka_hosts, produce_topic=produce_topic) - grobid_client = GrobidClient(host_url=args.grobid_host) - worker = IngestFileWorker(grobid_client=grobid_client, sink=sink) - pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, - consume_topic=consume_topic, group="ingest-file", batch_size=1) + sink = KafkaSink( + kafka_hosts=args.kafka_hosts, + produce_topic=produce_topic, + ) + grobid_client = GrobidClient( + host_url=args.grobid_host, + ) + worker = IngestFileWorker( + grobid_client=grobid_client, + sink=sink, + ) + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="ingest-file", + batch_size=1, + ) pusher.run() def run_persist_ingest_file(args): @@ -58,8 +92,14 @@ def run_persist_ingest_file(args): worker = PersistIngestFileResultWorker( db_url=args.db_url, ) - pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts, - consume_topic=consume_topic, group="ingest-persist") + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="persist-ingest", + push_batches=True, + batch_size=100, + ) pusher.run() def main(): |