diff options
author | Bryan Newbold <bnewbold@archive.org> | 2018-11-21 13:08:26 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2018-11-21 13:08:26 -0800 |
commit | 9430558300df7e83442b771f139450c2e754f975 (patch) | |
tree | 087492a83b77f10845d5f054fa7be012cf2546f6 | |
parent | 2e6254251549dd932659e0a7f8bf6cda8e8abdf3 (diff) | |
download | sandcrawler-9430558300df7e83442b771f139450c2e754f975.tar.gz sandcrawler-9430558300df7e83442b771f139450c2e754f975.zip |
kafka_grobid tweaks for deployment; delay insert decision
-rwxr-xr-x | python/kafka_grobid.py | 30 |
1 files changed, 9 insertions, 21 deletions
diff --git a/python/kafka_grobid.py b/python/kafka_grobid.py index 31a3a55..b12b492 100755 --- a/python/kafka_grobid.py +++ b/python/kafka_grobid.py @@ -198,29 +198,9 @@ class KafkaGrobidWorker: return None, status extraction_status = status - # Decide what to bother inserting back into HBase - # Basically, don't overwrite backfill fields. - grobid_status_code = info.get('grobid0:status_code', None) - for k in list(info.keys()): - if k.encode('utf-8') in ('f:c', 'file:mime', 'file:cdx'): - info.pop(k) - - # Convert fields to binary - for k in list(info.keys()): - if info[k] is None: - info.pop(k) - # NOTE: we're not actually sending these f:*, file:* keys... - elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json', - 'grobid0:metadata'): - assert type(info[k]) == dict - info[k] = json.dumps(info[k], sort_keys=True, indent=None) - elif k in ('file:size', 'grobid0:status_code'): - # encode as int64 in network byte order - if info[k] != {} and info[k] != None: - info[k] = struct.pack('!q', info[k]) - #self.increment_counter('lines', 'success') + grobid_status_code = info.get('grobid0:status_code', None) if extraction_status is not None: return info, dict(status="partial", key=key, grobid_status_code=grobid_status_code, @@ -272,6 +252,9 @@ def main(): parser.add_argument('--kafka-hosts', default="localhost:9092", help="list of Kafka brokers (host/port) to use") + parser.add_argument('--kafka-env', + default="qa", + help="eg, 'qa' or 'prod'") parser.add_argument('--consume-topic', default="sandcrawler-qa.ungrobided", help="Kafka topic to consume from") @@ -288,6 +271,11 @@ def main(): help='URI where WARCs can be found') args = parser.parse_args() + if args.consume_topic is None: + args.consume_topic = "sandcrawler-{}.ungrobided".format(args.kafka_env) + if args.produce_topic is None: + args.produce_topic = "sandcrawler-{}.grobid-output".format(args.kafka_env) + worker = KafkaGrobidWorker(**args.__dict__) worker.run() |