diff options
Diffstat (limited to 'python/sandcrawler/workers.py')
| -rw-r--r-- | python/sandcrawler/workers.py | 42 | 
1 files changed, 22 insertions, 20 deletions
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 25d567f..a8b03c7 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -47,7 +47,7 @@ class SandcrawlerWorker(object):      def finish(self):          if self.sink:              self.sink.finish() -        sys.stderr.write("Worker: {}\n".format(self.counts)) +        print("Worker: {}".format(self.counts), file=sys.stderr)          return self.counts  class MultiprocessWrapper(SandcrawlerWorker): @@ -60,7 +60,7 @@ class MultiprocessWrapper(SandcrawlerWorker):      def push_batch(self, tasks):          self.counts['total'] += len(tasks) -        sys.stderr.write("... processing batch of: {}\n".format(len(tasks))) +        print("... processing batch of: {}".format(len(tasks)), file=sys.stderr)          results = self.pool.map(self.worker.process, tasks)          for result in results:              if not result: @@ -81,7 +81,7 @@ class MultiprocessWrapper(SandcrawlerWorker):          if self.sink:              self.sink.finish()          worker_counts = self.worker.finish() -        sys.stderr.write("Multiprocessing: {}\n".format(self.counts)) +        print("Multiprocessing: {}".format(self.counts), file=sys.stderr)          return worker_counts  class BlackholeSink(SandcrawlerWorker): @@ -117,8 +117,8 @@ class KafkaSink(SandcrawlerWorker):      @staticmethod      def _fail_fast(err, msg):          if err is not None: -            sys.stderr.write("Kafka producer delivery error: {}\n".format(err)) -            sys.stderr.write("Bailing out...\n") +            print("Kafka producer delivery error: {}".format(err), file=sys.stderr) +            print("Bailing out...", file=sys.stderr)              # TODO: should it be sys.exit(-1)?              raise KafkaException(err) @@ -234,7 +234,7 @@ class JsonLinePusher(RecordPusher):              self.counts['pushed'] += len(batch)              batch = []          worker_counts = self.worker.finish() -        sys.stderr.write("JSON lines pushed: {}\n".format(self.counts)) +        print("JSON lines pushed: {}".format(self.counts), file=sys.stderr)          return self.counts @@ -281,7 +281,7 @@ class CdxLinePusher(RecordPusher):              self.counts['pushed'] += len(batch)              batch = []          worker_counts = self.worker.finish() -        sys.stderr.write("CDX lines pushed: {}\n".format(self.counts)) +        print("CDX lines pushed: {}".format(self.counts), file=sys.stderr)          return self.counts @@ -306,7 +306,7 @@ class ZipfilePusher(RecordPusher):                  self.worker.push_record(data)                  self.counts['pushed'] += 1          worker_counts = self.worker.finish() -        sys.stderr.write("ZIP PDFs pushed: {}\n".format(self.counts)) +        print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)          return self.counts @@ -338,8 +338,9 @@ class KafkaJsonPusher(RecordPusher):              batch = self.consumer.consume(                  num_messages=self.batch_size,                  timeout=self.poll_interval) -            sys.stderr.write("... got {} kafka messages ({}sec poll interval)\n".format( -                len(batch), self.poll_interval)) +            print("... got {} kafka messages ({}sec poll interval)".format( +                    len(batch), self.poll_interval), +                file=sys.stderr)              if not batch:                  # TODO: could have some larger timeout here and                  # self.worker.finish() if it's been more than, eg, a couple @@ -355,7 +356,7 @@ class KafkaJsonPusher(RecordPusher):                  records = [json.loads(msg.value().decode('utf-8')) for msg in batch]                  self.worker.push_batch(records)                  self.counts['pushed'] += len(batch) -                sys.stderr.write("Import counts: {}\n".format(self.worker.counts)) +                print("Import counts: {}".format(self.worker.counts), file=sys.stderr)              else:                  for msg in batch:                      self.counts['total'] += 1 @@ -363,7 +364,7 @@ class KafkaJsonPusher(RecordPusher):                      self.worker.push_record(record)                      self.counts['pushed'] += 1                      if self.counts['total'] % 500 == 0: -                        sys.stderr.write("Import counts: {}\n".format(self.worker.counts)) +                        print("Import counts: {}".format(self.worker.counts), file=sys.stderr)              for msg in batch:                  # locally store offsets of processed messages; will be                  # auto-commited by librdkafka from this "stored" value @@ -372,7 +373,7 @@ class KafkaJsonPusher(RecordPusher):          # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or          # commit the current batch if it has been lingering          worker_counts = self.worker.finish() -        sys.stderr.write("KafkaJson lines pushed: {}\n".format(self.counts)) +        print("KafkaJson lines pushed: {}".format(self.counts), file=sys.stderr)          self.consumer.close()          return self.counts @@ -382,15 +383,15 @@ def make_kafka_consumer(hosts, consume_topic, group):      def fail_fast(err, partitions):          if err is not None: -            sys.stderr.write("Kafka consumer commit error: {}\n".format(err)) -            sys.stderr.write("Bailing out...\n") +            print("Kafka consumer commit error: {}".format(err), file=sys.stderr) +            print("Bailing out...", file=sys.stderr)              # TODO: should it be sys.exit(-1)?              raise KafkaException(err)          for p in partitions:              # check for partition-specific commit errors              if p.error: -                sys.stderr.write("Kafka consumer commit error: {}\n".format(p.error)) -                sys.stderr.write("Bailing out...\n") +                print("Kafka consumer commit error: {}".format(p.error), file=sys.stderr) +                print("Bailing out...", file=sys.stderr)                  # TODO: should it be sys.exit(-1)?                  raise KafkaException(err)          #print("Kafka consumer commit successful") @@ -419,8 +420,9 @@ def make_kafka_consumer(hosts, consume_topic, group):          for p in partitions:              if p.error:                  raise KafkaException(p.error) -        sys.stderr.write("Kafka partitions rebalanced: {} / {}\n".format( -            consumer, partitions)) +        print("Kafka partitions rebalanced: {} / {}".format( +                consumer, partitions), +            file=sys.stderr)      consumer = Consumer(conf)      # NOTE: it's actually important that topic_name *not* be bytes (UTF-8 @@ -429,5 +431,5 @@ def make_kafka_consumer(hosts, consume_topic, group):          on_assign=on_rebalance,          on_revoke=on_rebalance,      ) -    sys.stderr.write("Consuming from kafka topic {}, group {}\n".format(topic_name, group)) +    print("Consuming from kafka topic {}, group {}".format(topic_name, group), file=sys.stderr)      return consumer  | 
