diff options
author | Bryan Newbold <bnewbold@archive.org> | 2020-04-16 13:46:30 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2020-04-16 13:46:30 -0700 |
commit | 83ca181637dfc34804649e1d342e3cb3ee59b5df (patch) | |
tree | a9a46b42a9ee0d2917f95159c1b9d12392f9e5cf /python | |
parent | 7243649b0171c0c02bda41ea57626ed4c0f59db0 (diff) | |
download | sandcrawler-83ca181637dfc34804649e1d342e3cb3ee59b5df.tar.gz sandcrawler-83ca181637dfc34804649e1d342e3cb3ee59b5df.zip |
batch/multiprocess for ZipfilePusher
Diffstat (limited to 'python')
-rwxr-xr-x | python/grobid_tool.py | 10 | ||||
-rw-r--r-- | python/sandcrawler/workers.py | 21 |
2 files changed, 26 insertions, 5 deletions
diff --git a/python/grobid_tool.py b/python/grobid_tool.py index dc9780d..a2d74a1 100755 --- a/python/grobid_tool.py +++ b/python/grobid_tool.py @@ -55,8 +55,14 @@ def run_extract_cdx(args): def run_extract_zipfile(args): grobid_client = GrobidClient(host_url=args.grobid_host) - worker = GrobidBlobWorker(grobid_client, sink=args.sink) - pusher = ZipfilePusher(worker, args.zip_file) + if args.jobs > 1: + print("multi-processing: {}".format(args.jobs), file=sys.stderr) + worker = GrobidBlobWorker(grobid_client, sink=None) + multi_worker = MultiprocessWrapper(worker, args.sink, jobs=args.jobs) + pusher = ZipfilePusher(multi_worker, args.zip_file, batch_size=args.jobs) + else: + worker = GrobidBlobWorker(grobid_client, sink=args.sink) + pusher = ZipfilePusher(worker, args.zip_file) pusher.run() def run_transform(args): diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 54bd581..d5db7a5 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -283,7 +283,7 @@ class CdxLinePusher(RecordPusher): continue if self.batch_size: batch.append(record) - if len(batch) > self.batch_size: + if len(batch) >= self.batch_size: self.worker.push_batch(batch) self.counts['pushed'] += len(batch) batch = [] @@ -306,8 +306,12 @@ class ZipfilePusher(RecordPusher): self.worker = worker self.filter_suffix = ".pdf" self.zipfile_path = zipfile_path + self.batch_size = kwargs.get('batch_size', None) + if self.batch_size in (0, 1): + self.batch_size = None def run(self): + batch = [] with zipfile.ZipFile(self.zipfile_path, 'r') as archive: for zipinfo in archive.infolist(): if not zipinfo.filename.endswith(self.filter_suffix): @@ -317,8 +321,19 @@ class ZipfilePusher(RecordPusher): flo = archive.open(zipinfo, 'r') data = flo.read(2**32) flo.close() - self.worker.push_record(data) - self.counts['pushed'] += 1 + if self.batch_size: + batch.append(data) + if len(batch) >= self.batch_size: + self.worker.push_batch(batch) + self.counts['pushed'] += len(batch) + batch = [] + else: + self.worker.push_record(data) + self.counts['pushed'] += 1 + if self.batch_size and batch: + self.worker.push_batch(batch) + self.counts['pushed'] += len(batch) + batch = [] worker_counts = self.worker.finish() print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr) return self.counts |