aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/workers.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r--python/sandcrawler/workers.py16
1 files changed, 10 insertions, 6 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 8c604fb..6b08f03 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -181,12 +181,16 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
key=default_key,
source=record,
status="empty-blob",
+ wayback_sec=wayback_sec,
+ petabox_sec=petabox_sec,
)
return dict(
key=default_key,
status="success",
source=record,
blob=blob,
+ wayback_sec=wayback_sec,
+ petabox_sec=petabox_sec,
)
@@ -219,9 +223,9 @@ class MultiprocessWrapper(SandcrawlerWorker):
self.pool.terminate()
if self.sink:
self.sink.finish()
- worker_counts = self.worker.finish()
+ self.worker.finish()
print("Multiprocessing: {}".format(self.counts), file=sys.stderr)
- return worker_counts
+ return self.counts
class BlackholeSink(SandcrawlerWorker):
@@ -370,7 +374,7 @@ class JsonLinePusher(RecordPusher):
self.worker.push_batch(batch)
self.counts['pushed'] += len(batch)
batch = []
- worker_counts = self.worker.finish()
+ self.worker.finish()
print("JSON lines pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
@@ -417,7 +421,7 @@ class CdxLinePusher(RecordPusher):
self.worker.push_batch(batch)
self.counts['pushed'] += len(batch)
batch = []
- worker_counts = self.worker.finish()
+ self.worker.finish()
print("CDX lines pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
@@ -456,7 +460,7 @@ class ZipfilePusher(RecordPusher):
self.worker.push_batch(batch)
self.counts['pushed'] += len(batch)
batch = []
- worker_counts = self.worker.finish()
+ self.worker.finish()
print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
@@ -552,7 +556,7 @@ class KafkaJsonPusher(RecordPusher):
# TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or
# commit the current batch if it has been lingering
- worker_counts = self.worker.finish()
+ self.worker.finish()
print("KafkaJson lines pushed: {}".format(self.counts), file=sys.stderr)
self.consumer.close()
return self.counts