diff options
author | Bryan Newbold <bnewbold@archive.org> | 2020-04-16 13:46:30 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2020-04-16 13:46:30 -0700 |
commit | 83ca181637dfc34804649e1d342e3cb3ee59b5df (patch) | |
tree | a9a46b42a9ee0d2917f95159c1b9d12392f9e5cf /python/sandcrawler/workers.py | |
parent | 7243649b0171c0c02bda41ea57626ed4c0f59db0 (diff) | |
download | sandcrawler-83ca181637dfc34804649e1d342e3cb3ee59b5df.tar.gz sandcrawler-83ca181637dfc34804649e1d342e3cb3ee59b5df.zip |
batch/multiprocess for ZipfilePusher
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r-- | python/sandcrawler/workers.py | 21 |
1 files changed, 18 insertions, 3 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 54bd581..d5db7a5 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -283,7 +283,7 @@ class CdxLinePusher(RecordPusher): continue if self.batch_size: batch.append(record) - if len(batch) > self.batch_size: + if len(batch) >= self.batch_size: self.worker.push_batch(batch) self.counts['pushed'] += len(batch) batch = [] @@ -306,8 +306,12 @@ class ZipfilePusher(RecordPusher): self.worker = worker self.filter_suffix = ".pdf" self.zipfile_path = zipfile_path + self.batch_size = kwargs.get('batch_size', None) + if self.batch_size in (0, 1): + self.batch_size = None def run(self): + batch = [] with zipfile.ZipFile(self.zipfile_path, 'r') as archive: for zipinfo in archive.infolist(): if not zipinfo.filename.endswith(self.filter_suffix): @@ -317,8 +321,19 @@ class ZipfilePusher(RecordPusher): flo = archive.open(zipinfo, 'r') data = flo.read(2**32) flo.close() - self.worker.push_record(data) - self.counts['pushed'] += 1 + if self.batch_size: + batch.append(data) + if len(batch) >= self.batch_size: + self.worker.push_batch(batch) + self.counts['pushed'] += len(batch) + batch = [] + else: + self.worker.push_record(data) + self.counts['pushed'] += 1 + if self.batch_size and batch: + self.worker.push_batch(batch) + self.counts['pushed'] += len(batch) + batch = [] worker_counts = self.worker.finish() print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr) return self.counts |