diff options
Diffstat (limited to 'python')
-rwxr-xr-x | python/kafka_grobid.py | 331 |
1 files changed, 0 insertions, 331 deletions
diff --git a/python/kafka_grobid.py b/python/kafka_grobid.py deleted file mode 100755 index b0920bc..0000000 --- a/python/kafka_grobid.py +++ /dev/null @@ -1,331 +0,0 @@ -#!/usr/bin/env python3 -""" -DEPRECATED: this worker uses old kafka topics and an old schema. Use -`sandcrawler_worker.py` instead. - -Kafka worker that does GROBID extraction from one queue and into another. - -Based on the ungrobided Hadoop job code. Does not talk to HBase at all, just -petabox and GROBID. Will delegate tasks to random GROBID workers. - -Lines (tasks) are enqueued using a trivial kafkacat invocation; output is -persisted in Kakfa (in compressed format), and also drained into HBase by a -second worker. - -Schema of tasks is the 'ungrobided' TSV output. Schema of output is JSON with -keys: - - "key": SHA1 in base32 with prefix, eg, "sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT" - "grobid0:status_code": HTTP status code (integer) - "grobid0:status": dict/json - "grobid0:tei_xml": xml as a single string - "f:c": dict/json from input - "file:mime": string from input - "file:cdx": dict/json from input - # NOT grobid0:tei_json, grobid0:metadata, or grobid0:quality, which can be - # re-derived from tei_xml - -Requires: -- requests -- pykafka -- wayback/GWB libraries -""" - -# XXX: some broken MRO thing going on in here due to python3 object wrangling -# in `wayback` library. Means we can't run pylint. -# pylint: skip-file - -import os -import sys -import xml -import json -import raven -import struct -import argparse -import requests -import pykafka -import wayback.exception -from http.client import IncompleteRead -from wayback.resourcestore import ResourceStore -from gwb.loader import CDXLoaderFactory - -from common import parse_ungrobided_line -from grobid2json import teixml2json - -# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable -sentry_client = raven.Client() - -# Specific poison-pill rows we should skip -KEY_DENYLIST = ( - 'sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT', # "failed to guess ARC header format" -) - -class KafkaGrobidWorker: - - def __init__(self, kafka_hosts, consume_topic, produce_topic, **kwargs): - self.consume_topic = consume_topic - self.produce_topic = produce_topic - self.consumer_group = kwargs.get('consumer_group', 'grobid-extraction') - self.kafka_hosts = kafka_hosts or 'localhost:9092' - self.grobid_uri = kwargs.get('grobid_uri') - # /serve/ instead of /download/ doesn't record view count - self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/') - # gwb library will fall back to reading from /opt/.petabox/webdata.secret - self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET')) - self.warc_uri_prefix = kwargs.get('warc_uri_prefix') - self.mime_filter = ['application/pdf'] - self.rstore = None - self.produce_max_request_size = 20000000 # Kafka producer batch size tuning; also limit on size of single extracted document - - def grobid_process_fulltext(self, content): - r = requests.post(self.grobid_uri + "/api/processFulltextDocument", - files={'input': content}) - return r - - def parse_line(self, raw_line): - """Line should be TSV and have non-null fields: - - - key (string) (separated in Kafka case) - - f:c (string, json) - - file:mime (string) - - file:cdx (string, json) - """ - - if (raw_line.startswith(' ') or raw_line.startswith('#') or raw_line.startswith('\t')): - return None, dict(status="invalid", reason="line prefix", input=raw_line) - - info = parse_ungrobided_line(raw_line) - if info is None: - return None, dict(status="invalid", reason="ungrobided parse") - - if info['file:mime'] not in self.mime_filter: - return None, dict(status="skip", reason="mimetype", mimetype=info['file:mime']) - - # If warc is not item/file.(w)arc.gz form, skip it - if len(info['file:cdx']['warc'].split('/')) != 2: - return None, dict(status="skip", reason="WARC path not petabox item/file", path=info['file:cdx']['warc']) - - return info, None - - def fetch_warc_content(self, warc_path, offset, c_size): - warc_uri = self.warc_uri_prefix + warc_path - if not self.rstore: - self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory( - webdata_secret=self.petabox_webdata_secret, - download_base_url=self.petabox_base_url)) - try: - gwb_record = self.rstore.load_resource(warc_uri, offset, c_size) - except wayback.exception.ResourceUnavailable: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (ResourceUnavailable)") - except ValueError as ve: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (ValueError: {})".format(ve)) - except EOFError as eofe: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (EOFError: {})".format(eofe)) - except TypeError as te: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te)) - # Note: could consider a generic "except Exception" here, as we get so - # many petabox errors. Do want jobs to fail loud and clear when the - # whole cluster is down though. - - if gwb_record.get_status()[0] != 200: - return None, dict(status="error", - reason="archived HTTP response (WARC) was not 200", - warc_status=gwb_record.get_status()[0]) - - try: - raw_content = gwb_record.open_raw_content().read() - except IncompleteRead as ire: - return None, dict(status="error", - reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire)) - return raw_content, None - - def extract(self, info): - - # Fetch data from WARCs in petabox - original_content, status = self.fetch_warc_content( - info['file:cdx']['warc'], - info['file:cdx']['offset'], - info['file:cdx']['c_size']) - if status: - return None, status - - info['file:size'] = len(original_content) - - # Submit to GROBID - try: - grobid_response = self.grobid_process_fulltext(original_content) - except requests.exceptions.ConnectionError: - return None, dict(status="error", reason="connection to GROBID worker") - - info['grobid0:status_code'] = grobid_response.status_code - - # 4 MByte XML size limit; don't record GROBID status on this path - if len(grobid_response.content) > 4000000: - info['grobid0:status'] = {'status': 'oversize'} - return info, dict(status="oversize", reason="TEI response was too large") - - if grobid_response.status_code != 200: - # response.text is .content decoded as utf-8 - info['grobid0:status'] = dict(status='error', description=grobid_response.text) - return info, dict(status="error", reason="non-200 GROBID HTTP status", - extra=grobid_response.text) - - info['grobid0:status'] = {'status': 'partial'} - info['grobid0:tei_xml'] = grobid_response.content - info['grobid0:status'] = {'status': 'success'} - - return info, None - - def do_work(self, raw_line): - """ - 1. parse filtered line - 2. fetch data from wayback - 3. submit to GROBID - 4. convert GROBID response to JSON (and metadata) - 6. determine "quality" - 6. produce result to kafka - - Returns: (grobid_output, status) (both are None or dict) - If grobid_output is None, error was recovered, status returned. - Otherwise, we were successful; grobid_output should be JSON serialized - and published to kafka. - """ - - #self.increment_counter('lines', 'total') - - # Parse line and filter down - info, status = self.parse_line(raw_line) - if info is None: - #self.increment_counter('lines', status['status']) - return None, status - key = info['key'] - if key in KEY_DENYLIST: - #self.increment_counter('lines', 'denylist') - return None, dict(status='denylist', key=key) - - # Note: this may not get "cleared" correctly - sentry_client.extra_context(dict( - row_key=key, - cdx_url=info['file:cdx']['url'], - cdx_dt=info['file:cdx']['dt'], - cdx_warc=info['file:cdx']['warc'], - )) - - # Do the extraction - info, status = self.extract(info) - if info is None: - #self.increment_counter('lines', status['status']) - status['key'] = key - return None, status - extraction_status = status - - # Need to encode 'bytes' as 'str' for JSON serialization - if info.get('grobid0:tei_xml'): - info['grobid0:tei_xml'] = info['grobid0:tei_xml'].decode('utf-8') - - #self.increment_counter('lines', 'success') - - grobid_status_code = info.get('grobid0:status_code', None) - if extraction_status is not None: - return info, dict(status="partial", key=key, - grobid_status_code=grobid_status_code, - reason=extraction_status['reason']) - else: - return info, dict(status="success", - grobid_status_code=grobid_status_code, key=key, - extra=extraction_status) - - def run(self): - - # 1. start consumer (in managed/balanced fashion, with consumer group) - # 2. for each thingie, do the work; if success publish to kafka; either - # way... print? log? - # 3. repeat! - - print("Starting Kafka GROBID extraction worker...") - kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="2.0.0") - produce_topic = kafka.topics[self.produce_topic] - consume_topic = kafka.topics[self.consume_topic] - - sequential_failures = 0 - # Configure producer to basically *immediately* publish messages, - # one-at-a-time, but asynchronously (don't block). Fetch and GROBID - # process takes a while, and we want to send as soon as processing is - # done. - with produce_topic.get_producer(sync=False, - compression=pykafka.common.CompressionType.GZIP, - retry_backoff_ms=250, - max_queued_messages=50, - min_queued_messages=10, - linger_ms=5000, - max_request_size=self.produce_max_request_size) as producer: - print("Producing to: {}".format(self.produce_topic)) - consumer = consume_topic.get_balanced_consumer( - consumer_group=self.consumer_group, - managed=True, - auto_commit_enable=True, - auto_commit_interval_ms=30000, # 30 seconds - # LATEST because best to miss processing than waste time re-process - auto_offset_reset=pykafka.common.OffsetType.LATEST, - queued_max_messages=50, - compacted_topic=True) - print("Consuming from: {} as {}".format(self.consume_topic, self.consumer_group)) - sys.stdout.flush() - for msg in consumer: - grobid_output, status = self.do_work(msg.value.decode('utf-8')) - if grobid_output: - print("extracted {}: {}".format( - grobid_output.get('key'), - status)) - sys.stdout.flush() - producer.produce(json.dumps(grobid_output, sort_keys=True).encode('utf-8')) - sequential_failures = 0 - else: - print("failed to extract: {}".format(status), file=sys.stderr) - sequential_failures += 1 - if sequential_failures > 20: - print("too many failures in a row, bailing out", file=sys.stderr) - sys.exit(-1) - - -@sentry_client.capture_exceptions -def main(): - - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument('--kafka-hosts', - default="localhost:9092", - help="list of Kafka brokers (host/port) to use") - parser.add_argument('--kafka-env', - default="qa", - help="eg, 'qa' or 'prod'") - parser.add_argument('--consume-topic', - default=None, - help="Kafka topic to consume from") - parser.add_argument('--produce-topic', - default=None, - help="Kafka topic to produce to") - parser.add_argument('--grobid-uri', - type=str, - default='http://localhost:8070', - help='URI of GROBID API Server') - parser.add_argument('--warc-uri-prefix', - type=str, - default='https://archive.org/serve/', - help='URI where WARCs can be found') - args = parser.parse_args() - - if args.consume_topic is None: - args.consume_topic = "sandcrawler-{}.ungrobided".format(args.kafka_env) - if args.produce_topic is None: - args.produce_topic = "sandcrawler-{}.grobid-output".format(args.kafka_env) - - worker = KafkaGrobidWorker(**args.__dict__) - worker.run() - -if __name__ == '__main__': # pragma: no cover - main() |