aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2018-11-21 17:54:21 -0800
committerBryan Newbold <bnewbold@archive.org>2018-11-21 17:54:21 -0800
commit4092a62bb03cd6e72b3f197e753593f6ee626182 (patch)
tree130c584f7b8b27f92afd53cec6333579c83e0a66
parent6591acdda8b09289fabfa913b2f6bb51642fd38f (diff)
downloadsandcrawler-4092a62bb03cd6e72b3f197e753593f6ee626182.tar.gz
sandcrawler-4092a62bb03cd6e72b3f197e753593f6ee626182.zip
many improvements to kafka HBase inserter
-rwxr-xr-xpython/kafka_grobid_hbase.py58
1 files changed, 29 insertions, 29 deletions
diff --git a/python/kafka_grobid_hbase.py b/python/kafka_grobid_hbase.py
index e6a53a1..986448b 100755
--- a/python/kafka_grobid_hbase.py
+++ b/python/kafka_grobid_hbase.py
@@ -46,6 +46,25 @@ class KafkaGrobidHbaseWorker:
self.hbase_table = kwargs['hbase_table']
self.hb_table = None # connection initialized in run()
+ def convert_tei(self, info):
+
+ # Convert TEI XML to JSON
+ try:
+ info['grobid0:tei_json'] = teixml2json(info['grobid0:tei_xml'], encumbered=True)
+ except xml.etree.ElementTree.ParseError:
+ info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML parse error")
+ return info, info['grobid0:status']
+ except ValueError:
+ info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML non-TEI content")
+ return info, info['grobid0:status']
+
+ tei_metadata = info['grobid0:tei_json'].copy()
+ for k in ('body', 'annex'):
+ # Remove fulltext (copywritted) content
+ tei_metadata.pop(k, None)
+ info['grobid0:metadata'] = tei_metadata
+ return info, None
+
def do_work(self, raw_line):
"""
1. parse info JSON (with XML inside)
@@ -64,32 +83,20 @@ class KafkaGrobidHbaseWorker:
# Note: this may not get "cleared" correctly
sentry_client.extra_context(dict(row_key=key))
+ print("inserting line to HBase: {}".format(key))
- # Need to decode 'str' back in to 'bytes' (from JSON serialization)
if info.get('grobid0:tei_xml'):
+ # Need to decode 'str' back in to 'bytes' (from JSON serialization)
info['grobid0:tei_xml'] = info['grobid0:tei_xml'].encode('utf-8')
- # Convert TEI XML to JSON
- try:
- info['grobid0:tei_json'] = teixml2json(info['grobid0:tei_xml'], encumbered=True)
- except xml.etree.ElementTree.ParseError:
- info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML parse error")
- return info, info['grobid0:status']
- except ValueError:
- info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML non-TEI content")
- return info, info['grobid0:status']
-
- tei_metadata = info['grobid0:tei_json'].copy()
- for k in ('body', 'annex'):
- # Remove fulltext (copywritted) content
- tei_metadata.pop(k, None)
- info['grobid0:metadata'] = tei_metadata
+ if info.get('grobid0:status') == 200 and info.get('grobid0:tei_xml'):
+ info, status = self.convert_tei(info)
# Decide what to bother inserting back into HBase
# Basically, don't overwrite backfill fields.
grobid_status_code = info.get('grobid0:status_code', None)
for k in list(info.keys()):
- if k.encode('utf-8') in ('f:c', 'file:mime', 'file:cdx'):
+ if k in ('f:c', 'file:mime', 'file:cdx'):
info.pop(k)
# Convert fields to binary
@@ -110,14 +117,8 @@ class KafkaGrobidHbaseWorker:
self.hb_table.put(key, info)
#self.increment_counter('lines', 'success')
- if extraction_status is not None:
- return info, dict(status="partial", key=key,
- grobid_status_code=grobid_status_code,
- reason=extraction_status['reason'])
- else:
- return info, dict(status="success",
- grobid_status_code=grobid_status_code, key=key,
- extra=extraction_status)
+ return info, dict(status="success",
+ grobid_status_code=grobid_status_code, key=key)
def run(self):
@@ -132,9 +133,8 @@ class KafkaGrobidHbaseWorker:
protocol="compact")
except Exception:
raise Exception("Couldn't connect to HBase using host: {}".format(host))
- self.hb_table = hb_conn.table(self.options.hbase_table)
+ self.hb_table = hb_conn.table(self.hbase_table)
- self.hb_table = hb_conn.table(self.options.hbase_table)
kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="1.0.0")
consume_topic = kafka.topics[self.consume_topic]
@@ -146,12 +146,12 @@ class KafkaGrobidHbaseWorker:
auto_commit_enable=True,
compacted_topic=True)
for msg in consumer:
- print("got a line! ")
+ #print("got a line! ")
grobid_output, status = self.do_work(msg.value.decode('utf-8'))
if grobid_output:
sequential_failures = 0
else:
- print("failed to extract: {}".format(status))
+ print("failed to process: {}".format(status))
sequential_failures += 1
if sequential_failures > 20:
print("too many failures in a row, bailing out")