diff options
Diffstat (limited to 'python/sandcrawler/workers.py')
-rw-r--r-- | python/sandcrawler/workers.py | 42 |
1 files changed, 22 insertions, 20 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 25d567f..a8b03c7 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -47,7 +47,7 @@ class SandcrawlerWorker(object): def finish(self): if self.sink: self.sink.finish() - sys.stderr.write("Worker: {}\n".format(self.counts)) + print("Worker: {}".format(self.counts), file=sys.stderr) return self.counts class MultiprocessWrapper(SandcrawlerWorker): @@ -60,7 +60,7 @@ class MultiprocessWrapper(SandcrawlerWorker): def push_batch(self, tasks): self.counts['total'] += len(tasks) - sys.stderr.write("... processing batch of: {}\n".format(len(tasks))) + print("... processing batch of: {}".format(len(tasks)), file=sys.stderr) results = self.pool.map(self.worker.process, tasks) for result in results: if not result: @@ -81,7 +81,7 @@ class MultiprocessWrapper(SandcrawlerWorker): if self.sink: self.sink.finish() worker_counts = self.worker.finish() - sys.stderr.write("Multiprocessing: {}\n".format(self.counts)) + print("Multiprocessing: {}".format(self.counts), file=sys.stderr) return worker_counts class BlackholeSink(SandcrawlerWorker): @@ -117,8 +117,8 @@ class KafkaSink(SandcrawlerWorker): @staticmethod def _fail_fast(err, msg): if err is not None: - sys.stderr.write("Kafka producer delivery error: {}\n".format(err)) - sys.stderr.write("Bailing out...\n") + print("Kafka producer delivery error: {}".format(err), file=sys.stderr) + print("Bailing out...", file=sys.stderr) # TODO: should it be sys.exit(-1)? raise KafkaException(err) @@ -234,7 +234,7 @@ class JsonLinePusher(RecordPusher): self.counts['pushed'] += len(batch) batch = [] worker_counts = self.worker.finish() - sys.stderr.write("JSON lines pushed: {}\n".format(self.counts)) + print("JSON lines pushed: {}".format(self.counts), file=sys.stderr) return self.counts @@ -281,7 +281,7 @@ class CdxLinePusher(RecordPusher): self.counts['pushed'] += len(batch) batch = [] worker_counts = self.worker.finish() - sys.stderr.write("CDX lines pushed: {}\n".format(self.counts)) + print("CDX lines pushed: {}".format(self.counts), file=sys.stderr) return self.counts @@ -306,7 +306,7 @@ class ZipfilePusher(RecordPusher): self.worker.push_record(data) self.counts['pushed'] += 1 worker_counts = self.worker.finish() - sys.stderr.write("ZIP PDFs pushed: {}\n".format(self.counts)) + print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr) return self.counts @@ -338,8 +338,9 @@ class KafkaJsonPusher(RecordPusher): batch = self.consumer.consume( num_messages=self.batch_size, timeout=self.poll_interval) - sys.stderr.write("... got {} kafka messages ({}sec poll interval)\n".format( - len(batch), self.poll_interval)) + print("... got {} kafka messages ({}sec poll interval)".format( + len(batch), self.poll_interval), + file=sys.stderr) if not batch: # TODO: could have some larger timeout here and # self.worker.finish() if it's been more than, eg, a couple @@ -355,7 +356,7 @@ class KafkaJsonPusher(RecordPusher): records = [json.loads(msg.value().decode('utf-8')) for msg in batch] self.worker.push_batch(records) self.counts['pushed'] += len(batch) - sys.stderr.write("Import counts: {}\n".format(self.worker.counts)) + print("Import counts: {}".format(self.worker.counts), file=sys.stderr) else: for msg in batch: self.counts['total'] += 1 @@ -363,7 +364,7 @@ class KafkaJsonPusher(RecordPusher): self.worker.push_record(record) self.counts['pushed'] += 1 if self.counts['total'] % 500 == 0: - sys.stderr.write("Import counts: {}\n".format(self.worker.counts)) + print("Import counts: {}".format(self.worker.counts), file=sys.stderr) for msg in batch: # locally store offsets of processed messages; will be # auto-commited by librdkafka from this "stored" value @@ -372,7 +373,7 @@ class KafkaJsonPusher(RecordPusher): # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or # commit the current batch if it has been lingering worker_counts = self.worker.finish() - sys.stderr.write("KafkaJson lines pushed: {}\n".format(self.counts)) + print("KafkaJson lines pushed: {}".format(self.counts), file=sys.stderr) self.consumer.close() return self.counts @@ -382,15 +383,15 @@ def make_kafka_consumer(hosts, consume_topic, group): def fail_fast(err, partitions): if err is not None: - sys.stderr.write("Kafka consumer commit error: {}\n".format(err)) - sys.stderr.write("Bailing out...\n") + print("Kafka consumer commit error: {}".format(err), file=sys.stderr) + print("Bailing out...", file=sys.stderr) # TODO: should it be sys.exit(-1)? raise KafkaException(err) for p in partitions: # check for partition-specific commit errors if p.error: - sys.stderr.write("Kafka consumer commit error: {}\n".format(p.error)) - sys.stderr.write("Bailing out...\n") + print("Kafka consumer commit error: {}".format(p.error), file=sys.stderr) + print("Bailing out...", file=sys.stderr) # TODO: should it be sys.exit(-1)? raise KafkaException(err) #print("Kafka consumer commit successful") @@ -419,8 +420,9 @@ def make_kafka_consumer(hosts, consume_topic, group): for p in partitions: if p.error: raise KafkaException(p.error) - sys.stderr.write("Kafka partitions rebalanced: {} / {}\n".format( - consumer, partitions)) + print("Kafka partitions rebalanced: {} / {}".format( + consumer, partitions), + file=sys.stderr) consumer = Consumer(conf) # NOTE: it's actually important that topic_name *not* be bytes (UTF-8 @@ -429,5 +431,5 @@ def make_kafka_consumer(hosts, consume_topic, group): on_assign=on_rebalance, on_revoke=on_rebalance, ) - sys.stderr.write("Consuming from kafka topic {}, group {}\n".format(topic_name, group)) + print("Consuming from kafka topic {}, group {}".format(topic_name, group), file=sys.stderr) return consumer |