aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xpython/kafka_grobid.py30
1 files changed, 9 insertions, 21 deletions
diff --git a/python/kafka_grobid.py b/python/kafka_grobid.py
index 31a3a55..b12b492 100755
--- a/python/kafka_grobid.py
+++ b/python/kafka_grobid.py
@@ -198,29 +198,9 @@ class KafkaGrobidWorker:
return None, status
extraction_status = status
- # Decide what to bother inserting back into HBase
- # Basically, don't overwrite backfill fields.
- grobid_status_code = info.get('grobid0:status_code', None)
- for k in list(info.keys()):
- if k.encode('utf-8') in ('f:c', 'file:mime', 'file:cdx'):
- info.pop(k)
-
- # Convert fields to binary
- for k in list(info.keys()):
- if info[k] is None:
- info.pop(k)
- # NOTE: we're not actually sending these f:*, file:* keys...
- elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',
- 'grobid0:metadata'):
- assert type(info[k]) == dict
- info[k] = json.dumps(info[k], sort_keys=True, indent=None)
- elif k in ('file:size', 'grobid0:status_code'):
- # encode as int64 in network byte order
- if info[k] != {} and info[k] != None:
- info[k] = struct.pack('!q', info[k])
-
#self.increment_counter('lines', 'success')
+ grobid_status_code = info.get('grobid0:status_code', None)
if extraction_status is not None:
return info, dict(status="partial", key=key,
grobid_status_code=grobid_status_code,
@@ -272,6 +252,9 @@ def main():
parser.add_argument('--kafka-hosts',
default="localhost:9092",
help="list of Kafka brokers (host/port) to use")
+ parser.add_argument('--kafka-env',
+ default="qa",
+ help="eg, 'qa' or 'prod'")
parser.add_argument('--consume-topic',
default="sandcrawler-qa.ungrobided",
help="Kafka topic to consume from")
@@ -288,6 +271,11 @@ def main():
help='URI where WARCs can be found')
args = parser.parse_args()
+ if args.consume_topic is None:
+ args.consume_topic = "sandcrawler-{}.ungrobided".format(args.kafka_env)
+ if args.produce_topic is None:
+ args.produce_topic = "sandcrawler-{}.grobid-output".format(args.kafka_env)
+
worker = KafkaGrobidWorker(**args.__dict__)
worker.run()