From 83ca181637dfc34804649e1d342e3cb3ee59b5df Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 16 Apr 2020 13:46:30 -0700 Subject: batch/multiprocess for ZipfilePusher --- python/grobid_tool.py | 10 ++++++++-- python/sandcrawler/workers.py | 21 ++++++++++++++++++--- 2 files changed, 26 insertions(+), 5 deletions(-) (limited to 'python') diff --git a/python/grobid_tool.py b/python/grobid_tool.py index dc9780d..a2d74a1 100755 --- a/python/grobid_tool.py +++ b/python/grobid_tool.py @@ -55,8 +55,14 @@ def run_extract_cdx(args): def run_extract_zipfile(args): grobid_client = GrobidClient(host_url=args.grobid_host) - worker = GrobidBlobWorker(grobid_client, sink=args.sink) - pusher = ZipfilePusher(worker, args.zip_file) + if args.jobs > 1: + print("multi-processing: {}".format(args.jobs), file=sys.stderr) + worker = GrobidBlobWorker(grobid_client, sink=None) + multi_worker = MultiprocessWrapper(worker, args.sink, jobs=args.jobs) + pusher = ZipfilePusher(multi_worker, args.zip_file, batch_size=args.jobs) + else: + worker = GrobidBlobWorker(grobid_client, sink=args.sink) + pusher = ZipfilePusher(worker, args.zip_file) pusher.run() def run_transform(args): diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 54bd581..d5db7a5 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -283,7 +283,7 @@ class CdxLinePusher(RecordPusher): continue if self.batch_size: batch.append(record) - if len(batch) > self.batch_size: + if len(batch) >= self.batch_size: self.worker.push_batch(batch) self.counts['pushed'] += len(batch) batch = [] @@ -306,8 +306,12 @@ class ZipfilePusher(RecordPusher): self.worker = worker self.filter_suffix = ".pdf" self.zipfile_path = zipfile_path + self.batch_size = kwargs.get('batch_size', None) + if self.batch_size in (0, 1): + self.batch_size = None def run(self): + batch = [] with zipfile.ZipFile(self.zipfile_path, 'r') as archive: for zipinfo in archive.infolist(): if not zipinfo.filename.endswith(self.filter_suffix): @@ -317,8 +321,19 @@ class ZipfilePusher(RecordPusher): flo = archive.open(zipinfo, 'r') data = flo.read(2**32) flo.close() - self.worker.push_record(data) - self.counts['pushed'] += 1 + if self.batch_size: + batch.append(data) + if len(batch) >= self.batch_size: + self.worker.push_batch(batch) + self.counts['pushed'] += len(batch) + batch = [] + else: + self.worker.push_record(data) + self.counts['pushed'] += 1 + if self.batch_size and batch: + self.worker.push_batch(batch) + self.counts['pushed'] += len(batch) + batch = [] worker_counts = self.worker.finish() print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr) return self.counts -- cgit v1.2.3