From 71756038bc376568f7bcf124b6f8a23fc9221594 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 26 Sep 2019 15:21:00 -0700 Subject: small improvements to GROBID tool --- python/grobid_tool.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'python/grobid_tool.py') diff --git a/python/grobid_tool.py b/python/grobid_tool.py index c9bcdc8..e7a7e5c 100755 --- a/python/grobid_tool.py +++ b/python/grobid_tool.py @@ -17,7 +17,7 @@ def run_extract_json(args): wayback_client = WaybackClient() worker = GrobidWorker(grobid_client, wayback_client, sink=None) multi_worker = MultiprocessWrapper(worker, args.sink) - pusher = JsonBatchPusher(worker, args.json_file, batch_size=30) + pusher = JsonLinePusher(multi_worker, args.json_file, batch_size=args.jobs) pusher.run() def run_extract_cdx(args): @@ -27,7 +27,7 @@ def run_extract_cdx(args): multi_worker = MultiprocessWrapper(worker, args.sink) pusher = CdxLinePusher(multi_worker, args.cdx_file, filter_http_statuses=[200], filter_mimetypes=['application/pdf'], - batch_size=30) + batch_size=args.jobs) pusher.run() def run_extract_zipfile(args): @@ -47,6 +47,9 @@ def main(): parser.add_argument('--kafka-env', default="dev", help="Kafka topic namespace to use (eg, prod, qa, dev)") + parser.add_argument('-j', '--jobs', + default=8, + help="parallelism for batch CPU jobs") parser.add_argument('--grobid-host', default="http://grobid.qa.fatcat.wiki", help="GROBID API host/port") @@ -78,6 +81,7 @@ def main(): args.sink = None if args.kafka_mode: produce_topic = "sandcrawler-{}.grobid-output-json".format(args.kafka_env) + print("Running in kafka output mode, publishing to {}\n".format(produce_topic)) args.sink = KafkaGrobidSink(kafka_hosts=args.kafka_hosts, produce_topic=produce_topic) -- cgit v1.2.3