aboutsummaryrefslogtreecommitdiffstats
path: root/python/kafka_grobid_hbase.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/kafka_grobid_hbase.py')
-rwxr-xr-xpython/kafka_grobid_hbase.py17
1 files changed, 11 insertions, 6 deletions
diff --git a/python/kafka_grobid_hbase.py b/python/kafka_grobid_hbase.py
index a7f2a5e..7c5c4a2 100755
--- a/python/kafka_grobid_hbase.py
+++ b/python/kafka_grobid_hbase.py
@@ -43,7 +43,7 @@ class KafkaGrobidHbaseWorker:
self.consumer_group = kwargs.get('consumer_group', 'grobid-hbase-insert2')
self.kafka_hosts = kafka_hosts or 'localhost:9092'
self.hbase_host = kwargs['hbase_host']
- self.hbase_table = kwargs['hbase_table']
+ self.hbase_table_name = kwargs['hbase_table']
self.hb_table = None # connection initialized in run()
def convert_tei(self, info):
@@ -127,34 +127,39 @@ class KafkaGrobidHbaseWorker:
# way... print? log?
# 3. repeat!
+ print("Starting grobid-hbase-worker...")
try:
host = self.hbase_host
hb_conn = happybase.Connection(host=host, transport="framed",
protocol="compact")
except Exception:
raise Exception("Couldn't connect to HBase using host: {}".format(host))
- self.hb_table = hb_conn.table(self.hbase_table)
+ self.hb_table = hb_conn.table(self.hbase_table_name)
+ print("HBase inserting into {}".format(self.hbase_table_name))
- kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="1.0.0")
+ kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="2.0.0")
consume_topic = kafka.topics[self.consume_topic]
- print("starting up...")
sequential_failures = 0
consumer = consume_topic.get_balanced_consumer(
consumer_group=self.consumer_group,
managed=True,
auto_commit_enable=True,
compacted_topic=True)
+ print("Kafka consuming {} in group {}".format(
+ self.consume_topic,
+ self.consumer_group))
+
for msg in consumer:
#print("got a line! ")
grobid_output, status = self.do_work(msg.value.decode('utf-8'))
if grobid_output:
sequential_failures = 0
else:
- print("failed to process: {}".format(status))
+ sys.stderr.write("Failed to process GROBID extraction output: {}\n".format(status))
sequential_failures += 1
if sequential_failures > 20:
- print("too many failures in a row, bailing out")
+ sys.stderr.write("too many failures in a row, bailing out\n")
sys.exit(-1)