diff options
author | Bryan Newbold <bnewbold@archive.org> | 2018-11-21 17:54:21 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@archive.org> | 2018-11-21 17:54:21 -0800 |
commit | 4092a62bb03cd6e72b3f197e753593f6ee626182 (patch) | |
tree | 130c584f7b8b27f92afd53cec6333579c83e0a66 /python | |
parent | 6591acdda8b09289fabfa913b2f6bb51642fd38f (diff) | |
download | sandcrawler-4092a62bb03cd6e72b3f197e753593f6ee626182.tar.gz sandcrawler-4092a62bb03cd6e72b3f197e753593f6ee626182.zip |
many improvements to kafka HBase inserter
Diffstat (limited to 'python')
-rwxr-xr-x | python/kafka_grobid_hbase.py | 58 |
1 files changed, 29 insertions, 29 deletions
diff --git a/python/kafka_grobid_hbase.py b/python/kafka_grobid_hbase.py index e6a53a1..986448b 100755 --- a/python/kafka_grobid_hbase.py +++ b/python/kafka_grobid_hbase.py @@ -46,6 +46,25 @@ class KafkaGrobidHbaseWorker: self.hbase_table = kwargs['hbase_table'] self.hb_table = None # connection initialized in run() + def convert_tei(self, info): + + # Convert TEI XML to JSON + try: + info['grobid0:tei_json'] = teixml2json(info['grobid0:tei_xml'], encumbered=True) + except xml.etree.ElementTree.ParseError: + info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML parse error") + return info, info['grobid0:status'] + except ValueError: + info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML non-TEI content") + return info, info['grobid0:status'] + + tei_metadata = info['grobid0:tei_json'].copy() + for k in ('body', 'annex'): + # Remove fulltext (copywritted) content + tei_metadata.pop(k, None) + info['grobid0:metadata'] = tei_metadata + return info, None + def do_work(self, raw_line): """ 1. parse info JSON (with XML inside) @@ -64,32 +83,20 @@ class KafkaGrobidHbaseWorker: # Note: this may not get "cleared" correctly sentry_client.extra_context(dict(row_key=key)) + print("inserting line to HBase: {}".format(key)) - # Need to decode 'str' back in to 'bytes' (from JSON serialization) if info.get('grobid0:tei_xml'): + # Need to decode 'str' back in to 'bytes' (from JSON serialization) info['grobid0:tei_xml'] = info['grobid0:tei_xml'].encode('utf-8') - # Convert TEI XML to JSON - try: - info['grobid0:tei_json'] = teixml2json(info['grobid0:tei_xml'], encumbered=True) - except xml.etree.ElementTree.ParseError: - info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML parse error") - return info, info['grobid0:status'] - except ValueError: - info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML non-TEI content") - return info, info['grobid0:status'] - - tei_metadata = info['grobid0:tei_json'].copy() - for k in ('body', 'annex'): - # Remove fulltext (copywritted) content - tei_metadata.pop(k, None) - info['grobid0:metadata'] = tei_metadata + if info.get('grobid0:status') == 200 and info.get('grobid0:tei_xml'): + info, status = self.convert_tei(info) # Decide what to bother inserting back into HBase # Basically, don't overwrite backfill fields. grobid_status_code = info.get('grobid0:status_code', None) for k in list(info.keys()): - if k.encode('utf-8') in ('f:c', 'file:mime', 'file:cdx'): + if k in ('f:c', 'file:mime', 'file:cdx'): info.pop(k) # Convert fields to binary @@ -110,14 +117,8 @@ class KafkaGrobidHbaseWorker: self.hb_table.put(key, info) #self.increment_counter('lines', 'success') - if extraction_status is not None: - return info, dict(status="partial", key=key, - grobid_status_code=grobid_status_code, - reason=extraction_status['reason']) - else: - return info, dict(status="success", - grobid_status_code=grobid_status_code, key=key, - extra=extraction_status) + return info, dict(status="success", + grobid_status_code=grobid_status_code, key=key) def run(self): @@ -132,9 +133,8 @@ class KafkaGrobidHbaseWorker: protocol="compact") except Exception: raise Exception("Couldn't connect to HBase using host: {}".format(host)) - self.hb_table = hb_conn.table(self.options.hbase_table) + self.hb_table = hb_conn.table(self.hbase_table) - self.hb_table = hb_conn.table(self.options.hbase_table) kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="1.0.0") consume_topic = kafka.topics[self.consume_topic] @@ -146,12 +146,12 @@ class KafkaGrobidHbaseWorker: auto_commit_enable=True, compacted_topic=True) for msg in consumer: - print("got a line! ") + #print("got a line! ") grobid_output, status = self.do_work(msg.value.decode('utf-8')) if grobid_output: sequential_failures = 0 else: - print("failed to extract: {}".format(status)) + print("failed to process: {}".format(status)) sequential_failures += 1 if sequential_failures > 20: print("too many failures in a row, bailing out") |