aboutsummaryrefslogtreecommitdiffstats
path: root/python_hadoop/kafka_grobid_hbase.py
blob: b52c3867cba8723b324074c664eddf4ac49e0984 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
#!/usr/bin/env python3
"""
Kafka worker that consumes GROBID output from Kafka and pushes into HBase.

Based on the ungrobided Hadoop job code.

TODO: binary conversion in 'grobided' topic? shouldn't be, do that here, as well as all TEI extraction/parsing

Requires:
- requests
- pykafka
"""

# XXX: some broken MRO thing going on in here due to python3 object wrangling
# in `wayback` library. Means we can't run pylint.
# pylint: skip-file

import sys
import xml
import json
import raven
import struct
import requests
import argparse
import happybase
import pykafka

from common import parse_ungrobided_line
from grobid2json import teixml2json

# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable
sentry_client = raven.Client()

# Specific poison-pill rows we should skip
KEY_DENYLIST = (
    'sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT',    # "failed to guess ARC header format"
)

class KafkaGrobidHbaseWorker:

    def __init__(self, kafka_hosts, consume_topic, **kwargs):
        self.consume_topic = consume_topic
        self.consumer_group = kwargs.get('consumer_group', 'grobid-hbase-insert2')
        self.kafka_hosts = kafka_hosts or 'localhost:9092'
        self.hbase_host = kwargs['hbase_host']
        self.hbase_table_name = kwargs['hbase_table']
        self.hb_table = None  # connection initialized in run()

    def convert_tei(self, info):

        # Convert TEI XML to JSON
        try:
            info['grobid0:tei_json'] = teixml2json(info['grobid0:tei_xml'], encumbered=True)
        except xml.etree.ElementTree.ParseError:
            info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML parse error")
            return info, info['grobid0:status']
        except ValueError:
            info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML non-TEI content")
            return info, info['grobid0:status']

        tei_metadata = info['grobid0:tei_json'].copy()
        for k in ('body', 'annex'):
            # Remove fulltext (copywritted) content
            tei_metadata.pop(k, None)
        info['grobid0:metadata'] = tei_metadata
        return info, None

    def do_work(self, raw_line):
        """
        1. parse info JSON (with XML inside)
        2. do XML -> JSON conversions
        3. push to HBase

        Returns: ???
        """

        # Parse line and filter down
        info = json.loads(raw_line)
        key = info['key']
        if key in KEY_DENYLIST:
            #self.increment_counter('lines', 'denylist')
            return None, dict(status='denylist', key=key)

        # Note: this may not get "cleared" correctly
        sentry_client.extra_context(dict(row_key=key))
        print("inserting line to HBase: {}".format(key))

        if info.get('grobid0:tei_xml'):
            # Need to decode 'str' back in to 'bytes' (from JSON serialization)
            info['grobid0:tei_xml'] = info['grobid0:tei_xml'].encode('utf-8')

        if info.get('grobid0:status') == 200 and info.get('grobid0:tei_xml'):
            info, status = self.convert_tei(info)

        # Decide what to bother inserting back into HBase
        # Basically, don't overwrite backfill fields.
        grobid_status_code = info.get('grobid0:status_code', None)
        for k in list(info.keys()):
            if k in ('f:c', 'file:mime', 'file:cdx'):
                info.pop(k)

        # Convert fields to binary
        for k in list(info.keys()):
            if info[k] is None:
                info.pop(k)
            # NOTE: we're not actually sending these f:*, file:* keys...
            elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',
                    'grobid0:metadata'):
                assert type(info[k]) == dict
                info[k] = json.dumps(info[k], sort_keys=True, indent=None)
            elif k in ('file:size', 'grobid0:status_code'):
                # encode as int64 in network byte order
                if info[k] != {} and info[k] != None:
                    info[k] = struct.pack('!q', info[k])

        key = info.pop('key')
        self.hb_table.put(key, info)
        #self.increment_counter('lines', 'success')

        return info, dict(status="success",
            grobid_status_code=grobid_status_code, key=key)

    def run(self):

        # 1. start consumer (in managed/balanced fashion, with consumer group)
        # 2. for each thingie, do the work; if success publish to kafka; either
        #    way... print? log?
        # 3. repeat!

        print("Starting grobid-hbase-worker...")
        try:
            host = self.hbase_host
            hb_conn = happybase.Connection(host=host, transport="framed",
                protocol="compact")
        except Exception:
            raise Exception("Couldn't connect to HBase using host: {}".format(host))
        self.hb_table = hb_conn.table(self.hbase_table_name)
        print("HBase inserting into {}".format(self.hbase_table_name))

        kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="2.0.0")
        consume_topic = kafka.topics[self.consume_topic]

        sequential_failures = 0
        consumer = consume_topic.get_balanced_consumer(
            consumer_group=self.consumer_group,
            managed=True,
            auto_commit_enable=True,
            # needed to avoid MessageSet decode errors
            fetch_message_max_bytes=4*1024*1024,
            # LATEST because best to miss processing than waste time re-process
            auto_offset_reset=pykafka.common.OffsetType.LATEST,
            compacted_topic=True)
        print("Kafka consuming {} in group {}".format(
            self.consume_topic,
            self.consumer_group))

        for msg in consumer:
            #print("got a line! ")
            grobid_output, status = self.do_work(msg.value.decode('utf-8'))
            if grobid_output:
                sequential_failures = 0
            else:
                sys.stderr.write("Failed to process GROBID extraction output: {}\n".format(status))
                sequential_failures += 1
                if sequential_failures > 20:
                    sys.stderr.write("too many failures in a row, bailing out\n")
                    sys.exit(-1)


@sentry_client.capture_exceptions
def main():

    parser = argparse.ArgumentParser()
    parser.add_argument('--kafka-hosts',
                        default="localhost:9092",
                        help="list of Kafka brokers (host/port) to use")
    parser.add_argument('--kafka-env',
                        default="qa",
                        help="eg, 'qa' or 'prod'")
    parser.add_argument('--consume-topic',
                        default=None,
                        help="Kafka topic to consume from")
    parser.add_argument('--hbase-table',
                        type=str,
                        default='wbgrp-journal-extract-0-qa',
                        help='HBase table to backfill into (must exist)')
    parser.add_argument('--hbase-host',
                        type=str,
                        default='localhost',
                        help='HBase thrift API host to connect to')
    args = parser.parse_args()

    if args.consume_topic is None:
        args.consume_topic = "sandcrawler-{}.grobid-output".format(args.kafka_env)

    worker = KafkaGrobidHbaseWorker(**args.__dict__)
    worker.run()

if __name__ == '__main__': # pragma: no cover
    main()