aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler_worker.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2019-12-27 16:35:56 -0800
committerBryan Newbold <bnewbold@archive.org>2020-01-02 18:12:58 -0800
commitacea0838caa93f194caa380a6211bf57cc8fc5bf (patch)
tree71bd454aaa0aa606f956fdbb049085ed8b062cee /python/sandcrawler_worker.py
parent24df2167b294973ce230199370b7b061e6ae4498 (diff)
downloadsandcrawler-acea0838caa93f194caa380a6211bf57cc8fc5bf.tar.gz
sandcrawler-acea0838caa93f194caa380a6211bf57cc8fc5bf.zip
update persist worker invocation to use batches
Diffstat (limited to 'python/sandcrawler_worker.py')
-rwxr-xr-xpython/sandcrawler_worker.py70
1 files changed, 55 insertions, 15 deletions
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py
index c926911..26735ce 100755
--- a/python/sandcrawler_worker.py
+++ b/python/sandcrawler_worker.py
@@ -21,12 +21,27 @@ sentry_client = raven.Client()
def run_grobid_extract(args):
consume_topic = "sandcrawler-{}.ungrobided-pg".format(args.env)
produce_topic = "sandcrawler-{}.grobid-output-pg".format(args.env)
- sink = KafkaSink(kafka_hosts=args.kafka_hosts, produce_topic=produce_topic)
- grobid_client = GrobidClient(host_url=args.grobid_host)
- wayback_client = WaybackClient(host_url=args.grobid_host)
- worker = GrobidWorker(grobid_client=grobid_client, wayback_client=wayback_client, sink=sink)
- pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts,
- consume_topic=consume_topic, group="grobid-extract")
+ sink = KafkaSink(
+ kafka_hosts=args.kafka_hosts,
+ produce_topic=produce_topic,
+ )
+ grobid_client = GrobidClient(
+ host_url=args.grobid_host,
+ )
+ wayback_client = WaybackClient(
+ host_url=args.grobid_host,
+ )
+ worker = GrobidWorker(
+ grobid_client=grobid_client,
+ wayback_client=wayback_client,
+ sink=sink,
+ )
+ pusher = KafkaJsonPusher(
+ worker=worker,
+ kafka_hosts=args.kafka_hosts,
+ consume_topic=consume_topic,
+ group="grobid-extract",
+ )
pusher.run()
def run_persist_grobid(args):
@@ -39,18 +54,37 @@ def run_persist_grobid(args):
s3_secret_key=args.s3_secret_key,
s3_only=args.s3_only,
)
- pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts,
- consume_topic=consume_topic, group="grobid-persist")
+ pusher = KafkaJsonPusher(
+ worker=worker,
+ kafka_hosts=args.kafka_hosts,
+ consume_topic=consume_topic,
+ group="persist-grobid",
+ push_batches=True,
+ batch_size=25,
+ )
pusher.run()
def run_ingest_file(args):
consume_topic = "sandcrawler-{}.ingest-file-requests".format(args.env)
produce_topic = "sandcrawler-{}.ingest-file-results".format(args.env)
- sink = KafkaSink(kafka_hosts=args.kafka_hosts, produce_topic=produce_topic)
- grobid_client = GrobidClient(host_url=args.grobid_host)
- worker = IngestFileWorker(grobid_client=grobid_client, sink=sink)
- pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts,
- consume_topic=consume_topic, group="ingest-file", batch_size=1)
+ sink = KafkaSink(
+ kafka_hosts=args.kafka_hosts,
+ produce_topic=produce_topic,
+ )
+ grobid_client = GrobidClient(
+ host_url=args.grobid_host,
+ )
+ worker = IngestFileWorker(
+ grobid_client=grobid_client,
+ sink=sink,
+ )
+ pusher = KafkaJsonPusher(
+ worker=worker,
+ kafka_hosts=args.kafka_hosts,
+ consume_topic=consume_topic,
+ group="ingest-file",
+ batch_size=1,
+ )
pusher.run()
def run_persist_ingest_file(args):
@@ -58,8 +92,14 @@ def run_persist_ingest_file(args):
worker = PersistIngestFileResultWorker(
db_url=args.db_url,
)
- pusher = KafkaJsonPusher(worker=worker, kafka_hosts=args.kafka_hosts,
- consume_topic=consume_topic, group="ingest-persist")
+ pusher = KafkaJsonPusher(
+ worker=worker,
+ kafka_hosts=args.kafka_hosts,
+ consume_topic=consume_topic,
+ group="persist-ingest",
+ push_batches=True,
+ batch_size=100,
+ )
pusher.run()
def main():