aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/workers.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2021-10-26 13:54:35 -0700
committerBryan Newbold <bnewbold@archive.org>2021-10-26 13:54:35 -0700
commit02cac8f857fe21474ab25aa7150bed2ac5b970d5 (patch)
tree5163390339c446257f4240579e14e81588fd3632 /python/sandcrawler/workers.py
parent6650f3862b87bdeac4f3bb9d3561f934858956a0 (diff)
downloadsandcrawler-02cac8f857fe21474ab25aa7150bed2ac5b970d5.tar.gz
sandcrawler-02cac8f857fe21474ab25aa7150bed2ac5b970d5.zip
flake8 clean (with current settings)
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r--python/sandcrawler/workers.py16
1 files changed, 10 insertions, 6 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 8c604fb..6b08f03 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -181,12 +181,16 @@ class SandcrawlerFetchWorker(SandcrawlerWorker):
key=default_key,
source=record,
status="empty-blob",
+ wayback_sec=wayback_sec,
+ petabox_sec=petabox_sec,
)
return dict(
key=default_key,
status="success",
source=record,
blob=blob,
+ wayback_sec=wayback_sec,
+ petabox_sec=petabox_sec,
)
@@ -219,9 +223,9 @@ class MultiprocessWrapper(SandcrawlerWorker):
self.pool.terminate()
if self.sink:
self.sink.finish()
- worker_counts = self.worker.finish()
+ self.worker.finish()
print("Multiprocessing: {}".format(self.counts), file=sys.stderr)
- return worker_counts
+ return self.counts
class BlackholeSink(SandcrawlerWorker):
@@ -370,7 +374,7 @@ class JsonLinePusher(RecordPusher):
self.worker.push_batch(batch)
self.counts['pushed'] += len(batch)
batch = []
- worker_counts = self.worker.finish()
+ self.worker.finish()
print("JSON lines pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
@@ -417,7 +421,7 @@ class CdxLinePusher(RecordPusher):
self.worker.push_batch(batch)
self.counts['pushed'] += len(batch)
batch = []
- worker_counts = self.worker.finish()
+ self.worker.finish()
print("CDX lines pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
@@ -456,7 +460,7 @@ class ZipfilePusher(RecordPusher):
self.worker.push_batch(batch)
self.counts['pushed'] += len(batch)
batch = []
- worker_counts = self.worker.finish()
+ self.worker.finish()
print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
@@ -552,7 +556,7 @@ class KafkaJsonPusher(RecordPusher):
# TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or
# commit the current batch if it has been lingering
- worker_counts = self.worker.finish()
+ self.worker.finish()
print("KafkaJson lines pushed: {}".format(self.counts), file=sys.stderr)
self.consumer.close()
return self.counts