From 83ca181637dfc34804649e1d342e3cb3ee59b5df Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 16 Apr 2020 13:46:30 -0700 Subject: batch/multiprocess for ZipfilePusher --- python/sandcrawler/workers.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) (limited to 'python/sandcrawler/workers.py') diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 54bd581..d5db7a5 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -283,7 +283,7 @@ class CdxLinePusher(RecordPusher): continue if self.batch_size: batch.append(record) - if len(batch) > self.batch_size: + if len(batch) >= self.batch_size: self.worker.push_batch(batch) self.counts['pushed'] += len(batch) batch = [] @@ -306,8 +306,12 @@ class ZipfilePusher(RecordPusher): self.worker = worker self.filter_suffix = ".pdf" self.zipfile_path = zipfile_path + self.batch_size = kwargs.get('batch_size', None) + if self.batch_size in (0, 1): + self.batch_size = None def run(self): + batch = [] with zipfile.ZipFile(self.zipfile_path, 'r') as archive: for zipinfo in archive.infolist(): if not zipinfo.filename.endswith(self.filter_suffix): @@ -317,8 +321,19 @@ class ZipfilePusher(RecordPusher): flo = archive.open(zipinfo, 'r') data = flo.read(2**32) flo.close() - self.worker.push_record(data) - self.counts['pushed'] += 1 + if self.batch_size: + batch.append(data) + if len(batch) >= self.batch_size: + self.worker.push_batch(batch) + self.counts['pushed'] += len(batch) + batch = [] + else: + self.worker.push_record(data) + self.counts['pushed'] += 1 + if self.batch_size and batch: + self.worker.push_batch(batch) + self.counts['pushed'] += len(batch) + batch = [] worker_counts = self.worker.finish() print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr) return self.counts -- cgit v1.2.3