aboutsummaryrefslogtreecommitdiffstats
path: root/python/kafka_grobid.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/kafka_grobid.py')
-rwxr-xr-xpython/kafka_grobid.py16
1 files changed, 11 insertions, 5 deletions
diff --git a/python/kafka_grobid.py b/python/kafka_grobid.py
index 2fe35ac..13fbcff 100755
--- a/python/kafka_grobid.py
+++ b/python/kafka_grobid.py
@@ -232,13 +232,14 @@ class KafkaGrobidWorker:
# way... print? log?
# 3. repeat!
- kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="1.0.0")
+ print("Starting Kafka GROBID extraction worker...")
+ kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="2.0.0")
produce_topic = kafka.topics[self.produce_topic]
consume_topic = kafka.topics[self.consume_topic]
- print("starting up...")
sequential_failures = 0
with produce_topic.get_producer(compression=pykafka.common.CompressionType.GZIP, sync=False) as producer:
+ print("Producing to: {}".format(self.produce_topic))
consumer = consume_topic.get_balanced_consumer(
consumer_group=self.consumer_group,
managed=True,
@@ -246,17 +247,22 @@ class KafkaGrobidWorker:
auto_commit_enable=True,
auto_commit_interval_ms=60000, # 60 seconds
compacted_topic=True)
+ print("Consuming from: {} as {}".format(self.consume_topic, self.consumer_group))
+ sys.stdout.flush()
for msg in consumer:
- print("got a line! ")
grobid_output, status = self.do_work(msg.value.decode('utf-8'))
if grobid_output:
+ print("extracted {}: {}".format(
+ grobid_output.get('key'),
+ status))
+ sys.stdout.flush()
producer.produce(json.dumps(grobid_output).encode('utf-8'))
sequential_failures = 0
else:
- print("failed to extract: {}".format(status))
+ sys.stderr.write("failed to extract: {}\n".format(status))
sequential_failures += 1
if sequential_failures > 20:
- print("too many failures in a row, bailing out")
+ sys.stderr.write("too many failures in a row, bailing out\n")
sys.exit(-1)