aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2020-04-16 13:46:30 -0700
committerBryan Newbold <bnewbold@archive.org>2020-04-16 13:46:30 -0700
commit83ca181637dfc34804649e1d342e3cb3ee59b5df (patch)
treea9a46b42a9ee0d2917f95159c1b9d12392f9e5cf /python
parent7243649b0171c0c02bda41ea57626ed4c0f59db0 (diff)
downloadsandcrawler-83ca181637dfc34804649e1d342e3cb3ee59b5df.tar.gz
sandcrawler-83ca181637dfc34804649e1d342e3cb3ee59b5df.zip
batch/multiprocess for ZipfilePusher
Diffstat (limited to 'python')
-rwxr-xr-xpython/grobid_tool.py10
-rw-r--r--python/sandcrawler/workers.py21
2 files changed, 26 insertions, 5 deletions
diff --git a/python/grobid_tool.py b/python/grobid_tool.py
index dc9780d..a2d74a1 100755
--- a/python/grobid_tool.py
+++ b/python/grobid_tool.py
@@ -55,8 +55,14 @@ def run_extract_cdx(args):
def run_extract_zipfile(args):
grobid_client = GrobidClient(host_url=args.grobid_host)
- worker = GrobidBlobWorker(grobid_client, sink=args.sink)
- pusher = ZipfilePusher(worker, args.zip_file)
+ if args.jobs > 1:
+ print("multi-processing: {}".format(args.jobs), file=sys.stderr)
+ worker = GrobidBlobWorker(grobid_client, sink=None)
+ multi_worker = MultiprocessWrapper(worker, args.sink, jobs=args.jobs)
+ pusher = ZipfilePusher(multi_worker, args.zip_file, batch_size=args.jobs)
+ else:
+ worker = GrobidBlobWorker(grobid_client, sink=args.sink)
+ pusher = ZipfilePusher(worker, args.zip_file)
pusher.run()
def run_transform(args):
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 54bd581..d5db7a5 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -283,7 +283,7 @@ class CdxLinePusher(RecordPusher):
continue
if self.batch_size:
batch.append(record)
- if len(batch) > self.batch_size:
+ if len(batch) >= self.batch_size:
self.worker.push_batch(batch)
self.counts['pushed'] += len(batch)
batch = []
@@ -306,8 +306,12 @@ class ZipfilePusher(RecordPusher):
self.worker = worker
self.filter_suffix = ".pdf"
self.zipfile_path = zipfile_path
+ self.batch_size = kwargs.get('batch_size', None)
+ if self.batch_size in (0, 1):
+ self.batch_size = None
def run(self):
+ batch = []
with zipfile.ZipFile(self.zipfile_path, 'r') as archive:
for zipinfo in archive.infolist():
if not zipinfo.filename.endswith(self.filter_suffix):
@@ -317,8 +321,19 @@ class ZipfilePusher(RecordPusher):
flo = archive.open(zipinfo, 'r')
data = flo.read(2**32)
flo.close()
- self.worker.push_record(data)
- self.counts['pushed'] += 1
+ if self.batch_size:
+ batch.append(data)
+ if len(batch) >= self.batch_size:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
+ else:
+ self.worker.push_record(data)
+ self.counts['pushed'] += 1
+ if self.batch_size and batch:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
worker_counts = self.worker.finish()
print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
return self.counts