aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/workers.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r--python/sandcrawler/workers.py21
1 files changed, 18 insertions, 3 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 54bd581..d5db7a5 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -283,7 +283,7 @@ class CdxLinePusher(RecordPusher):
continue
if self.batch_size:
batch.append(record)
- if len(batch) > self.batch_size:
+ if len(batch) >= self.batch_size:
self.worker.push_batch(batch)
self.counts['pushed'] += len(batch)
batch = []
@@ -306,8 +306,12 @@ class ZipfilePusher(RecordPusher):
self.worker = worker
self.filter_suffix = ".pdf"
self.zipfile_path = zipfile_path
+ self.batch_size = kwargs.get('batch_size', None)
+ if self.batch_size in (0, 1):
+ self.batch_size = None
def run(self):
+ batch = []
with zipfile.ZipFile(self.zipfile_path, 'r') as archive:
for zipinfo in archive.infolist():
if not zipinfo.filename.endswith(self.filter_suffix):
@@ -317,8 +321,19 @@ class ZipfilePusher(RecordPusher):
flo = archive.open(zipinfo, 'r')
data = flo.read(2**32)
flo.close()
- self.worker.push_record(data)
- self.counts['pushed'] += 1
+ if self.batch_size:
+ batch.append(data)
+ if len(batch) >= self.batch_size:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
+ else:
+ self.worker.push_record(data)
+ self.counts['pushed'] += 1
+ if self.batch_size and batch:
+ self.worker.push_batch(batch)
+ self.counts['pushed'] += len(batch)
+ batch = []
worker_counts = self.worker.finish()
print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
return self.counts