diff options
author | Bryan Newbold <bnewbold@archive.org> | 2018-12-03 16:48:36 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2018-12-03 16:50:15 -0800 |
commit | 22b8f10bf94cdd4729095b48f3de932fa62cf8a4 (patch) | |
tree | 94c6ba07cc6555d6290fbf59290065c63e06252d /python/kafka_grobid.py | |
parent | 08e0bee3570bd3a5e846c92276fd339ce66af059 (diff) | |
download | sandcrawler-22b8f10bf94cdd4729095b48f3de932fa62cf8a4.tar.gz sandcrawler-22b8f10bf94cdd4729095b48f3de932fa62cf8a4.zip |
improvements to Kafka GROBID worker logging
Diffstat (limited to 'python/kafka_grobid.py')
-rwxr-xr-x | python/kafka_grobid.py | 16 |
1 files changed, 11 insertions, 5 deletions
diff --git a/python/kafka_grobid.py b/python/kafka_grobid.py index 2fe35ac..13fbcff 100755 --- a/python/kafka_grobid.py +++ b/python/kafka_grobid.py @@ -232,13 +232,14 @@ class KafkaGrobidWorker: # way... print? log? # 3. repeat! - kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="1.0.0") + print("Starting Kafka GROBID extraction worker...") + kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="2.0.0") produce_topic = kafka.topics[self.produce_topic] consume_topic = kafka.topics[self.consume_topic] - print("starting up...") sequential_failures = 0 with produce_topic.get_producer(compression=pykafka.common.CompressionType.GZIP, sync=False) as producer: + print("Producing to: {}".format(self.produce_topic)) consumer = consume_topic.get_balanced_consumer( consumer_group=self.consumer_group, managed=True, @@ -246,17 +247,22 @@ class KafkaGrobidWorker: auto_commit_enable=True, auto_commit_interval_ms=60000, # 60 seconds compacted_topic=True) + print("Consuming from: {} as {}".format(self.consume_topic, self.consumer_group)) + sys.stdout.flush() for msg in consumer: - print("got a line! ") grobid_output, status = self.do_work(msg.value.decode('utf-8')) if grobid_output: + print("extracted {}: {}".format( + grobid_output.get('key'), + status)) + sys.stdout.flush() producer.produce(json.dumps(grobid_output).encode('utf-8')) sequential_failures = 0 else: - print("failed to extract: {}".format(status)) + sys.stderr.write("failed to extract: {}\n".format(status)) sequential_failures += 1 if sequential_failures > 20: - print("too many failures in a row, bailing out") + sys.stderr.write("too many failures in a row, bailing out\n") sys.exit(-1) |