From 9430558300df7e83442b771f139450c2e754f975 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 21 Nov 2018 13:08:26 -0800 Subject: kafka_grobid tweaks for deployment; delay insert decision --- python/kafka_grobid.py | 30 +++++++++--------------------- 1 file changed, 9 insertions(+), 21 deletions(-) diff --git a/python/kafka_grobid.py b/python/kafka_grobid.py index 31a3a55..b12b492 100755 --- a/python/kafka_grobid.py +++ b/python/kafka_grobid.py @@ -198,29 +198,9 @@ class KafkaGrobidWorker: return None, status extraction_status = status - # Decide what to bother inserting back into HBase - # Basically, don't overwrite backfill fields. - grobid_status_code = info.get('grobid0:status_code', None) - for k in list(info.keys()): - if k.encode('utf-8') in ('f:c', 'file:mime', 'file:cdx'): - info.pop(k) - - # Convert fields to binary - for k in list(info.keys()): - if info[k] is None: - info.pop(k) - # NOTE: we're not actually sending these f:*, file:* keys... - elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json', - 'grobid0:metadata'): - assert type(info[k]) == dict - info[k] = json.dumps(info[k], sort_keys=True, indent=None) - elif k in ('file:size', 'grobid0:status_code'): - # encode as int64 in network byte order - if info[k] != {} and info[k] != None: - info[k] = struct.pack('!q', info[k]) - #self.increment_counter('lines', 'success') + grobid_status_code = info.get('grobid0:status_code', None) if extraction_status is not None: return info, dict(status="partial", key=key, grobid_status_code=grobid_status_code, @@ -272,6 +252,9 @@ def main(): parser.add_argument('--kafka-hosts', default="localhost:9092", help="list of Kafka brokers (host/port) to use") + parser.add_argument('--kafka-env', + default="qa", + help="eg, 'qa' or 'prod'") parser.add_argument('--consume-topic', default="sandcrawler-qa.ungrobided", help="Kafka topic to consume from") @@ -288,6 +271,11 @@ def main(): help='URI where WARCs can be found') args = parser.parse_args() + if args.consume_topic is None: + args.consume_topic = "sandcrawler-{}.ungrobided".format(args.kafka_env) + if args.produce_topic is None: + args.produce_topic = "sandcrawler-{}.grobid-output".format(args.kafka_env) + worker = KafkaGrobidWorker(**args.__dict__) worker.run() -- cgit v1.2.3