#!/usr/bin/env python3 """ Kafka worker that consumes GROBID output from Kafka and pushes into HBase. Based on the ungrobided Hadoop job code. TODO: binary conversion in 'grobided' topic? shouldn't be, do that here, as well as all TEI extraction/parsing Requires: - requests - pykafka """ # XXX: some broken MRO thing going on in here due to python3 object wrangling # in `wayback` library. Means we can't run pylint. # pylint: skip-file import sys import xml import json import raven import struct import requests import argparse import happybase import pykafka from common import parse_ungrobided_line from grobid2json import teixml2json # Yep, a global. Gets DSN from `SENTRY_DSN` environment variable sentry_client = raven.Client() # Specific poison-pill rows we should skip KEY_DENYLIST = ( 'sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT', # "failed to guess ARC header format" ) class KafkaGrobidHbaseWorker: def __init__(self, kafka_hosts, consume_topic, **kwargs): self.consume_topic = consume_topic self.consumer_group = kwargs.get('consumer_group', 'grobid-hbase-insert2') self.kafka_hosts = kafka_hosts or 'localhost:9092' self.hbase_host = kwargs['hbase_host'] self.hbase_table_name = kwargs['hbase_table'] self.hb_table = None # connection initialized in run() def convert_tei(self, info): # Convert TEI XML to JSON try: info['grobid0:tei_json'] = teixml2json(info['grobid0:tei_xml'], encumbered=True) except xml.etree.ElementTree.ParseError: info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML parse error") return info, info['grobid0:status'] except ValueError: info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML non-TEI content") return info, info['grobid0:status'] tei_metadata = info['grobid0:tei_json'].copy() for k in ('body', 'annex'): # Remove fulltext (copywritted) content tei_metadata.pop(k, None) info['grobid0:metadata'] = tei_metadata return info, None def do_work(self, raw_line): """ 1. parse info JSON (with XML inside) 2. do XML -> JSON conversions 3. push to HBase Returns: ??? """ # Parse line and filter down info = json.loads(raw_line) key = info['key'] if key in KEY_DENYLIST: #self.increment_counter('lines', 'denylist') return None, dict(status='denylist', key=key) # Note: this may not get "cleared" correctly sentry_client.extra_context(dict(row_key=key)) print("inserting line to HBase: {}".format(key)) if info.get('grobid0:tei_xml'): # Need to decode 'str' back in to 'bytes' (from JSON serialization) info['grobid0:tei_xml'] = info['grobid0:tei_xml'].encode('utf-8') if info.get('grobid0:status') == 200 and info.get('grobid0:tei_xml'): info, status = self.convert_tei(info) # Decide what to bother inserting back into HBase # Basically, don't overwrite backfill fields. grobid_status_code = info.get('grobid0:status_code', None) for k in list(info.keys()): if k in ('f:c', 'file:mime', 'file:cdx'): info.pop(k) # Convert fields to binary for k in list(info.keys()): if info[k] is None: info.pop(k) # NOTE: we're not actually sending these f:*, file:* keys... elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json', 'grobid0:metadata'): assert type(info[k]) == dict info[k] = json.dumps(info[k], sort_keys=True, indent=None) elif k in ('file:size', 'grobid0:status_code'): # encode as int64 in network byte order if info[k] != {} and info[k] != None: info[k] = struct.pack('!q', info[k]) key = info.pop('key') self.hb_table.put(key, info) #self.increment_counter('lines', 'success') return info, dict(status="success", grobid_status_code=grobid_status_code, key=key) def run(self): # 1. start consumer (in managed/balanced fashion, with consumer group) # 2. for each thingie, do the work; if success publish to kafka; either # way... print? log? # 3. repeat! print("Starting grobid-hbase-worker...") try: host = self.hbase_host hb_conn = happybase.Connection(host=host, transport="framed", protocol="compact") except Exception: raise Exception("Couldn't connect to HBase using host: {}".format(host)) self.hb_table = hb_conn.table(self.hbase_table_name) print("HBase inserting into {}".format(self.hbase_table_name)) kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="2.0.0") consume_topic = kafka.topics[self.consume_topic] sequential_failures = 0 consumer = consume_topic.get_balanced_consumer( consumer_group=self.consumer_group, managed=True, auto_commit_enable=True, # needed to avoid MessageSet decode errors fetch_message_max_bytes=4*1024*1024, # LATEST because best to miss processing than waste time re-process auto_offset_reset=pykafka.common.OffsetType.LATEST, compacted_topic=True) print("Kafka consuming {} in group {}".format( self.consume_topic, self.consumer_group)) for msg in consumer: #print("got a line! ") grobid_output, status = self.do_work(msg.value.decode('utf-8')) if grobid_output: sequential_failures = 0 else: sys.stderr.write("Failed to process GROBID extraction output: {}\n".format(status)) sequential_failures += 1 if sequential_failures > 20: sys.stderr.write("too many failures in a row, bailing out\n") sys.exit(-1) @sentry_client.capture_exceptions def main(): parser = argparse.ArgumentParser() parser.add_argument('--kafka-hosts', default="localhost:9092", help="list of Kafka brokers (host/port) to use") parser.add_argument('--kafka-env', default="qa", help="eg, 'qa' or 'prod'") parser.add_argument('--consume-topic', default=None, help="Kafka topic to consume from") parser.add_argument('--hbase-table', type=str, default='wbgrp-journal-extract-0-qa', help='HBase table to backfill into (must exist)') parser.add_argument('--hbase-host', type=str, default='localhost', help='HBase thrift API host to connect to') args = parser.parse_args() if args.consume_topic is None: args.consume_topic = "sandcrawler-{}.grobid-output".format(args.kafka_env) worker = KafkaGrobidHbaseWorker(**args.__dict__) worker.run() if __name__ == '__main__': # pragma: no cover main()