From 71756038bc376568f7bcf124b6f8a23fc9221594 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 26 Sep 2019 15:21:00 -0700 Subject: small improvements to GROBID tool --- python/grobid_tool.py | 8 ++++++-- python/sandcrawler/grobid.py | 4 ++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/python/grobid_tool.py b/python/grobid_tool.py index c9bcdc8..e7a7e5c 100755 --- a/python/grobid_tool.py +++ b/python/grobid_tool.py @@ -17,7 +17,7 @@ def run_extract_json(args): wayback_client = WaybackClient() worker = GrobidWorker(grobid_client, wayback_client, sink=None) multi_worker = MultiprocessWrapper(worker, args.sink) - pusher = JsonBatchPusher(worker, args.json_file, batch_size=30) + pusher = JsonLinePusher(multi_worker, args.json_file, batch_size=args.jobs) pusher.run() def run_extract_cdx(args): @@ -27,7 +27,7 @@ def run_extract_cdx(args): multi_worker = MultiprocessWrapper(worker, args.sink) pusher = CdxLinePusher(multi_worker, args.cdx_file, filter_http_statuses=[200], filter_mimetypes=['application/pdf'], - batch_size=30) + batch_size=args.jobs) pusher.run() def run_extract_zipfile(args): @@ -47,6 +47,9 @@ def main(): parser.add_argument('--kafka-env', default="dev", help="Kafka topic namespace to use (eg, prod, qa, dev)") + parser.add_argument('-j', '--jobs', + default=8, + help="parallelism for batch CPU jobs") parser.add_argument('--grobid-host', default="http://grobid.qa.fatcat.wiki", help="GROBID API host/port") @@ -78,6 +81,7 @@ def main(): args.sink = None if args.kafka_mode: produce_topic = "sandcrawler-{}.grobid-output-json".format(args.kafka_env) + print("Running in kafka output mode, publishing to {}\n".format(produce_topic)) args.sink = KafkaGrobidSink(kafka_hosts=args.kafka_hosts, produce_topic=produce_topic) diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py index a610404..32addca 100644 --- a/python/sandcrawler/grobid.py +++ b/python/sandcrawler/grobid.py @@ -57,6 +57,7 @@ class GrobidWorker(SandcrawlerWorker): self.consolidate_mode = 1 def process(self, record): + self.counts['total'] += 1 if record.get('warc_path') and record.get('warc_offset'): # it's a full CDX dict. fetch using WaybackClient if not self.wayback_client: @@ -81,6 +82,7 @@ class GrobidWorker(SandcrawlerWorker): result['file_meta'] = gen_file_metadata(blob) result['source'] = record result['key'] = result['file_meta']['sha1hex'] + self.counts[result['status']] += 1 return result class GrobidBlobWorker(SandcrawlerWorker): @@ -96,9 +98,11 @@ class GrobidBlobWorker(SandcrawlerWorker): self.consolidate_mode = 1 def process(self, blob): + self.counts['total'] += 1 assert blob result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode) result['file_meta'] = gen_file_metadata(blob) result['key'] = result['file_meta']['sha1hex'] + self.counts[result['status']] += 1 return result -- cgit v1.2.3