aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xpython/kafka_grobid.py16
-rwxr-xr-xpython/kafka_grobid_hbase.py17
2 files changed, 22 insertions, 11 deletions
diff --git a/python/kafka_grobid.py b/python/kafka_grobid.py
index 2fe35ac..13fbcff 100755
--- a/python/kafka_grobid.py
+++ b/python/kafka_grobid.py
@@ -232,13 +232,14 @@ class KafkaGrobidWorker:
# way... print? log?
# 3. repeat!
- kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="1.0.0")
+ print("Starting Kafka GROBID extraction worker...")
+ kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="2.0.0")
produce_topic = kafka.topics[self.produce_topic]
consume_topic = kafka.topics[self.consume_topic]
- print("starting up...")
sequential_failures = 0
with produce_topic.get_producer(compression=pykafka.common.CompressionType.GZIP, sync=False) as producer:
+ print("Producing to: {}".format(self.produce_topic))
consumer = consume_topic.get_balanced_consumer(
consumer_group=self.consumer_group,
managed=True,
@@ -246,17 +247,22 @@ class KafkaGrobidWorker:
auto_commit_enable=True,
auto_commit_interval_ms=60000, # 60 seconds
compacted_topic=True)
+ print("Consuming from: {} as {}".format(self.consume_topic, self.consumer_group))
+ sys.stdout.flush()
for msg in consumer:
- print("got a line! ")
grobid_output, status = self.do_work(msg.value.decode('utf-8'))
if grobid_output:
+ print("extracted {}: {}".format(
+ grobid_output.get('key'),
+ status))
+ sys.stdout.flush()
producer.produce(json.dumps(grobid_output).encode('utf-8'))
sequential_failures = 0
else:
- print("failed to extract: {}".format(status))
+ sys.stderr.write("failed to extract: {}\n".format(status))
sequential_failures += 1
if sequential_failures > 20:
- print("too many failures in a row, bailing out")
+ sys.stderr.write("too many failures in a row, bailing out\n")
sys.exit(-1)
diff --git a/python/kafka_grobid_hbase.py b/python/kafka_grobid_hbase.py
index a7f2a5e..7c5c4a2 100755
--- a/python/kafka_grobid_hbase.py
+++ b/python/kafka_grobid_hbase.py
@@ -43,7 +43,7 @@ class KafkaGrobidHbaseWorker:
self.consumer_group = kwargs.get('consumer_group', 'grobid-hbase-insert2')
self.kafka_hosts = kafka_hosts or 'localhost:9092'
self.hbase_host = kwargs['hbase_host']
- self.hbase_table = kwargs['hbase_table']
+ self.hbase_table_name = kwargs['hbase_table']
self.hb_table = None # connection initialized in run()
def convert_tei(self, info):
@@ -127,34 +127,39 @@ class KafkaGrobidHbaseWorker:
# way... print? log?
# 3. repeat!
+ print("Starting grobid-hbase-worker...")
try:
host = self.hbase_host
hb_conn = happybase.Connection(host=host, transport="framed",
protocol="compact")
except Exception:
raise Exception("Couldn't connect to HBase using host: {}".format(host))
- self.hb_table = hb_conn.table(self.hbase_table)
+ self.hb_table = hb_conn.table(self.hbase_table_name)
+ print("HBase inserting into {}".format(self.hbase_table_name))
- kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="1.0.0")
+ kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="2.0.0")
consume_topic = kafka.topics[self.consume_topic]
- print("starting up...")
sequential_failures = 0
consumer = consume_topic.get_balanced_consumer(
consumer_group=self.consumer_group,
managed=True,
auto_commit_enable=True,
compacted_topic=True)
+ print("Kafka consuming {} in group {}".format(
+ self.consume_topic,
+ self.consumer_group))
+
for msg in consumer:
#print("got a line! ")
grobid_output, status = self.do_work(msg.value.decode('utf-8'))
if grobid_output:
sequential_failures = 0
else:
- print("failed to process: {}".format(status))
+ sys.stderr.write("Failed to process GROBID extraction output: {}\n".format(status))
sequential_failures += 1
if sequential_failures > 20:
- print("too many failures in a row, bailing out")
+ sys.stderr.write("too many failures in a row, bailing out\n")
sys.exit(-1)