From 22b8f10bf94cdd4729095b48f3de932fa62cf8a4 Mon Sep 17 00:00:00 2001
From: Bryan Newbold <bnewbold@archive.org>
Date: Mon, 3 Dec 2018 16:48:36 -0800
Subject: improvements to Kafka GROBID worker logging

---
 python/kafka_grobid.py       | 16 +++++++++++-----
 python/kafka_grobid_hbase.py | 17 +++++++++++------
 2 files changed, 22 insertions(+), 11 deletions(-)

(limited to 'python')

diff --git a/python/kafka_grobid.py b/python/kafka_grobid.py
index 2fe35ac..13fbcff 100755
--- a/python/kafka_grobid.py
+++ b/python/kafka_grobid.py
@@ -232,13 +232,14 @@ class KafkaGrobidWorker:
         #    way... print? log?
         # 3. repeat!
 
-        kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="1.0.0")
+        print("Starting Kafka GROBID extraction worker...")
+        kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="2.0.0")
         produce_topic = kafka.topics[self.produce_topic]
         consume_topic = kafka.topics[self.consume_topic]
 
-        print("starting up...")
         sequential_failures = 0
         with produce_topic.get_producer(compression=pykafka.common.CompressionType.GZIP, sync=False) as producer:
+            print("Producing to: {}".format(self.produce_topic))
             consumer = consume_topic.get_balanced_consumer(
                 consumer_group=self.consumer_group,
                 managed=True,
@@ -246,17 +247,22 @@ class KafkaGrobidWorker:
                 auto_commit_enable=True,
                 auto_commit_interval_ms=60000, # 60 seconds
                 compacted_topic=True)
+            print("Consuming from: {} as {}".format(self.consume_topic, self.consumer_group))
+            sys.stdout.flush()
             for msg in consumer:
-                print("got a line! ")
                 grobid_output, status = self.do_work(msg.value.decode('utf-8'))
                 if grobid_output:
+                    print("extracted {}: {}".format(
+                        grobid_output.get('key'),
+                        status))
+                    sys.stdout.flush()
                     producer.produce(json.dumps(grobid_output).encode('utf-8'))
                     sequential_failures = 0
                 else:
-                    print("failed to extract: {}".format(status))
+                    sys.stderr.write("failed to extract: {}\n".format(status))
                     sequential_failures += 1
                     if sequential_failures > 20:
-                        print("too many failures in a row, bailing out")
+                        sys.stderr.write("too many failures in a row, bailing out\n")
                         sys.exit(-1)
 
 
diff --git a/python/kafka_grobid_hbase.py b/python/kafka_grobid_hbase.py
index a7f2a5e..7c5c4a2 100755
--- a/python/kafka_grobid_hbase.py
+++ b/python/kafka_grobid_hbase.py
@@ -43,7 +43,7 @@ class KafkaGrobidHbaseWorker:
         self.consumer_group = kwargs.get('consumer_group', 'grobid-hbase-insert2')
         self.kafka_hosts = kafka_hosts or 'localhost:9092'
         self.hbase_host = kwargs['hbase_host']
-        self.hbase_table = kwargs['hbase_table']
+        self.hbase_table_name = kwargs['hbase_table']
         self.hb_table = None  # connection initialized in run()
 
     def convert_tei(self, info):
@@ -127,34 +127,39 @@ class KafkaGrobidHbaseWorker:
         #    way... print? log?
         # 3. repeat!
 
+        print("Starting grobid-hbase-worker...")
         try:
             host = self.hbase_host
             hb_conn = happybase.Connection(host=host, transport="framed",
                 protocol="compact")
         except Exception:
             raise Exception("Couldn't connect to HBase using host: {}".format(host))
-        self.hb_table = hb_conn.table(self.hbase_table)
+        self.hb_table = hb_conn.table(self.hbase_table_name)
+        print("HBase inserting into {}".format(self.hbase_table_name))
 
-        kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="1.0.0")
+        kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="2.0.0")
         consume_topic = kafka.topics[self.consume_topic]
 
-        print("starting up...")
         sequential_failures = 0
         consumer = consume_topic.get_balanced_consumer(
             consumer_group=self.consumer_group,
             managed=True,
             auto_commit_enable=True,
             compacted_topic=True)
+        print("Kafka consuming {} in group {}".format(
+            self.consume_topic,
+            self.consumer_group))
+
         for msg in consumer:
             #print("got a line! ")
             grobid_output, status = self.do_work(msg.value.decode('utf-8'))
             if grobid_output:
                 sequential_failures = 0
             else:
-                print("failed to process: {}".format(status))
+                sys.stderr.write("Failed to process GROBID extraction output: {}\n".format(status))
                 sequential_failures += 1
                 if sequential_failures > 20:
-                    print("too many failures in a row, bailing out")
+                    sys.stderr.write("too many failures in a row, bailing out\n")
                     sys.exit(-1)
 
 
-- 
cgit v1.2.3