aboutsummaryrefslogtreecommitdiffstats
path: root/python/sandcrawler/workers.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r--python/sandcrawler/workers.py42
1 files changed, 22 insertions, 20 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 25d567f..a8b03c7 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -47,7 +47,7 @@ class SandcrawlerWorker(object):
def finish(self):
if self.sink:
self.sink.finish()
- sys.stderr.write("Worker: {}\n".format(self.counts))
+ print("Worker: {}".format(self.counts), file=sys.stderr)
return self.counts
class MultiprocessWrapper(SandcrawlerWorker):
@@ -60,7 +60,7 @@ class MultiprocessWrapper(SandcrawlerWorker):
def push_batch(self, tasks):
self.counts['total'] += len(tasks)
- sys.stderr.write("... processing batch of: {}\n".format(len(tasks)))
+ print("... processing batch of: {}".format(len(tasks)), file=sys.stderr)
results = self.pool.map(self.worker.process, tasks)
for result in results:
if not result:
@@ -81,7 +81,7 @@ class MultiprocessWrapper(SandcrawlerWorker):
if self.sink:
self.sink.finish()
worker_counts = self.worker.finish()
- sys.stderr.write("Multiprocessing: {}\n".format(self.counts))
+ print("Multiprocessing: {}".format(self.counts), file=sys.stderr)
return worker_counts
class BlackholeSink(SandcrawlerWorker):
@@ -117,8 +117,8 @@ class KafkaSink(SandcrawlerWorker):
@staticmethod
def _fail_fast(err, msg):
if err is not None:
- sys.stderr.write("Kafka producer delivery error: {}\n".format(err))
- sys.stderr.write("Bailing out...\n")
+ print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
@@ -234,7 +234,7 @@ class JsonLinePusher(RecordPusher):
self.counts['pushed'] += len(batch)
batch = []
worker_counts = self.worker.finish()
- sys.stderr.write("JSON lines pushed: {}\n".format(self.counts))
+ print("JSON lines pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
@@ -281,7 +281,7 @@ class CdxLinePusher(RecordPusher):
self.counts['pushed'] += len(batch)
batch = []
worker_counts = self.worker.finish()
- sys.stderr.write("CDX lines pushed: {}\n".format(self.counts))
+ print("CDX lines pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
@@ -306,7 +306,7 @@ class ZipfilePusher(RecordPusher):
self.worker.push_record(data)
self.counts['pushed'] += 1
worker_counts = self.worker.finish()
- sys.stderr.write("ZIP PDFs pushed: {}\n".format(self.counts))
+ print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
@@ -338,8 +338,9 @@ class KafkaJsonPusher(RecordPusher):
batch = self.consumer.consume(
num_messages=self.batch_size,
timeout=self.poll_interval)
- sys.stderr.write("... got {} kafka messages ({}sec poll interval)\n".format(
- len(batch), self.poll_interval))
+ print("... got {} kafka messages ({}sec poll interval)".format(
+ len(batch), self.poll_interval),
+ file=sys.stderr)
if not batch:
# TODO: could have some larger timeout here and
# self.worker.finish() if it's been more than, eg, a couple
@@ -355,7 +356,7 @@ class KafkaJsonPusher(RecordPusher):
records = [json.loads(msg.value().decode('utf-8')) for msg in batch]
self.worker.push_batch(records)
self.counts['pushed'] += len(batch)
- sys.stderr.write("Import counts: {}\n".format(self.worker.counts))
+ print("Import counts: {}".format(self.worker.counts), file=sys.stderr)
else:
for msg in batch:
self.counts['total'] += 1
@@ -363,7 +364,7 @@ class KafkaJsonPusher(RecordPusher):
self.worker.push_record(record)
self.counts['pushed'] += 1
if self.counts['total'] % 500 == 0:
- sys.stderr.write("Import counts: {}\n".format(self.worker.counts))
+ print("Import counts: {}".format(self.worker.counts), file=sys.stderr)
for msg in batch:
# locally store offsets of processed messages; will be
# auto-commited by librdkafka from this "stored" value
@@ -372,7 +373,7 @@ class KafkaJsonPusher(RecordPusher):
# TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or
# commit the current batch if it has been lingering
worker_counts = self.worker.finish()
- sys.stderr.write("KafkaJson lines pushed: {}\n".format(self.counts))
+ print("KafkaJson lines pushed: {}".format(self.counts), file=sys.stderr)
self.consumer.close()
return self.counts
@@ -382,15 +383,15 @@ def make_kafka_consumer(hosts, consume_topic, group):
def fail_fast(err, partitions):
if err is not None:
- sys.stderr.write("Kafka consumer commit error: {}\n".format(err))
- sys.stderr.write("Bailing out...\n")
+ print("Kafka consumer commit error: {}".format(err), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
for p in partitions:
# check for partition-specific commit errors
if p.error:
- sys.stderr.write("Kafka consumer commit error: {}\n".format(p.error))
- sys.stderr.write("Bailing out...\n")
+ print("Kafka consumer commit error: {}".format(p.error), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
#print("Kafka consumer commit successful")
@@ -419,8 +420,9 @@ def make_kafka_consumer(hosts, consume_topic, group):
for p in partitions:
if p.error:
raise KafkaException(p.error)
- sys.stderr.write("Kafka partitions rebalanced: {} / {}\n".format(
- consumer, partitions))
+ print("Kafka partitions rebalanced: {} / {}".format(
+ consumer, partitions),
+ file=sys.stderr)
consumer = Consumer(conf)
# NOTE: it's actually important that topic_name *not* be bytes (UTF-8
@@ -429,5 +431,5 @@ def make_kafka_consumer(hosts, consume_topic, group):
on_assign=on_rebalance,
on_revoke=on_rebalance,
)
- sys.stderr.write("Consuming from kafka topic {}, group {}\n".format(topic_name, group))
+ print("Consuming from kafka topic {}, group {}".format(topic_name, group), file=sys.stderr)
return consumer