aboutsummaryrefslogtreecommitdiffstats
path: root/python/kafka_grobid.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/kafka_grobid.py')
-rwxr-xr-xpython/kafka_grobid.py331
1 files changed, 0 insertions, 331 deletions
diff --git a/python/kafka_grobid.py b/python/kafka_grobid.py
deleted file mode 100755
index b0920bc..0000000
--- a/python/kafka_grobid.py
+++ /dev/null
@@ -1,331 +0,0 @@
-#!/usr/bin/env python3
-"""
-DEPRECATED: this worker uses old kafka topics and an old schema. Use
-`sandcrawler_worker.py` instead.
-
-Kafka worker that does GROBID extraction from one queue and into another.
-
-Based on the ungrobided Hadoop job code. Does not talk to HBase at all, just
-petabox and GROBID. Will delegate tasks to random GROBID workers.
-
-Lines (tasks) are enqueued using a trivial kafkacat invocation; output is
-persisted in Kakfa (in compressed format), and also drained into HBase by a
-second worker.
-
-Schema of tasks is the 'ungrobided' TSV output. Schema of output is JSON with
-keys:
-
- "key": SHA1 in base32 with prefix, eg, "sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT"
- "grobid0:status_code": HTTP status code (integer)
- "grobid0:status": dict/json
- "grobid0:tei_xml": xml as a single string
- "f:c": dict/json from input
- "file:mime": string from input
- "file:cdx": dict/json from input
- # NOT grobid0:tei_json, grobid0:metadata, or grobid0:quality, which can be
- # re-derived from tei_xml
-
-Requires:
-- requests
-- pykafka
-- wayback/GWB libraries
-"""
-
-# XXX: some broken MRO thing going on in here due to python3 object wrangling
-# in `wayback` library. Means we can't run pylint.
-# pylint: skip-file
-
-import os
-import sys
-import xml
-import json
-import raven
-import struct
-import argparse
-import requests
-import pykafka
-import wayback.exception
-from http.client import IncompleteRead
-from wayback.resourcestore import ResourceStore
-from gwb.loader import CDXLoaderFactory
-
-from common import parse_ungrobided_line
-from grobid2json import teixml2json
-
-# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable
-sentry_client = raven.Client()
-
-# Specific poison-pill rows we should skip
-KEY_DENYLIST = (
- 'sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT', # "failed to guess ARC header format"
-)
-
-class KafkaGrobidWorker:
-
- def __init__(self, kafka_hosts, consume_topic, produce_topic, **kwargs):
- self.consume_topic = consume_topic
- self.produce_topic = produce_topic
- self.consumer_group = kwargs.get('consumer_group', 'grobid-extraction')
- self.kafka_hosts = kafka_hosts or 'localhost:9092'
- self.grobid_uri = kwargs.get('grobid_uri')
- # /serve/ instead of /download/ doesn't record view count
- self.petabox_base_url = kwargs.get('petabox_base_url', 'http://archive.org/serve/')
- # gwb library will fall back to reading from /opt/.petabox/webdata.secret
- self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET'))
- self.warc_uri_prefix = kwargs.get('warc_uri_prefix')
- self.mime_filter = ['application/pdf']
- self.rstore = None
- self.produce_max_request_size = 20000000 # Kafka producer batch size tuning; also limit on size of single extracted document
-
- def grobid_process_fulltext(self, content):
- r = requests.post(self.grobid_uri + "/api/processFulltextDocument",
- files={'input': content})
- return r
-
- def parse_line(self, raw_line):
- """Line should be TSV and have non-null fields:
-
- - key (string) (separated in Kafka case)
- - f:c (string, json)
- - file:mime (string)
- - file:cdx (string, json)
- """
-
- if (raw_line.startswith(' ') or raw_line.startswith('#') or raw_line.startswith('\t')):
- return None, dict(status="invalid", reason="line prefix", input=raw_line)
-
- info = parse_ungrobided_line(raw_line)
- if info is None:
- return None, dict(status="invalid", reason="ungrobided parse")
-
- if info['file:mime'] not in self.mime_filter:
- return None, dict(status="skip", reason="mimetype", mimetype=info['file:mime'])
-
- # If warc is not item/file.(w)arc.gz form, skip it
- if len(info['file:cdx']['warc'].split('/')) != 2:
- return None, dict(status="skip", reason="WARC path not petabox item/file", path=info['file:cdx']['warc'])
-
- return info, None
-
- def fetch_warc_content(self, warc_path, offset, c_size):
- warc_uri = self.warc_uri_prefix + warc_path
- if not self.rstore:
- self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory(
- webdata_secret=self.petabox_webdata_secret,
- download_base_url=self.petabox_base_url))
- try:
- gwb_record = self.rstore.load_resource(warc_uri, offset, c_size)
- except wayback.exception.ResourceUnavailable:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (ResourceUnavailable)")
- except ValueError as ve:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (ValueError: {})".format(ve))
- except EOFError as eofe:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
- except TypeError as te:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te))
- # Note: could consider a generic "except Exception" here, as we get so
- # many petabox errors. Do want jobs to fail loud and clear when the
- # whole cluster is down though.
-
- if gwb_record.get_status()[0] != 200:
- return None, dict(status="error",
- reason="archived HTTP response (WARC) was not 200",
- warc_status=gwb_record.get_status()[0])
-
- try:
- raw_content = gwb_record.open_raw_content().read()
- except IncompleteRead as ire:
- return None, dict(status="error",
- reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire))
- return raw_content, None
-
- def extract(self, info):
-
- # Fetch data from WARCs in petabox
- original_content, status = self.fetch_warc_content(
- info['file:cdx']['warc'],
- info['file:cdx']['offset'],
- info['file:cdx']['c_size'])
- if status:
- return None, status
-
- info['file:size'] = len(original_content)
-
- # Submit to GROBID
- try:
- grobid_response = self.grobid_process_fulltext(original_content)
- except requests.exceptions.ConnectionError:
- return None, dict(status="error", reason="connection to GROBID worker")
-
- info['grobid0:status_code'] = grobid_response.status_code
-
- # 4 MByte XML size limit; don't record GROBID status on this path
- if len(grobid_response.content) > 4000000:
- info['grobid0:status'] = {'status': 'oversize'}
- return info, dict(status="oversize", reason="TEI response was too large")
-
- if grobid_response.status_code != 200:
- # response.text is .content decoded as utf-8
- info['grobid0:status'] = dict(status='error', description=grobid_response.text)
- return info, dict(status="error", reason="non-200 GROBID HTTP status",
- extra=grobid_response.text)
-
- info['grobid0:status'] = {'status': 'partial'}
- info['grobid0:tei_xml'] = grobid_response.content
- info['grobid0:status'] = {'status': 'success'}
-
- return info, None
-
- def do_work(self, raw_line):
- """
- 1. parse filtered line
- 2. fetch data from wayback
- 3. submit to GROBID
- 4. convert GROBID response to JSON (and metadata)
- 6. determine "quality"
- 6. produce result to kafka
-
- Returns: (grobid_output, status) (both are None or dict)
- If grobid_output is None, error was recovered, status returned.
- Otherwise, we were successful; grobid_output should be JSON serialized
- and published to kafka.
- """
-
- #self.increment_counter('lines', 'total')
-
- # Parse line and filter down
- info, status = self.parse_line(raw_line)
- if info is None:
- #self.increment_counter('lines', status['status'])
- return None, status
- key = info['key']
- if key in KEY_DENYLIST:
- #self.increment_counter('lines', 'denylist')
- return None, dict(status='denylist', key=key)
-
- # Note: this may not get "cleared" correctly
- sentry_client.extra_context(dict(
- row_key=key,
- cdx_url=info['file:cdx']['url'],
- cdx_dt=info['file:cdx']['dt'],
- cdx_warc=info['file:cdx']['warc'],
- ))
-
- # Do the extraction
- info, status = self.extract(info)
- if info is None:
- #self.increment_counter('lines', status['status'])
- status['key'] = key
- return None, status
- extraction_status = status
-
- # Need to encode 'bytes' as 'str' for JSON serialization
- if info.get('grobid0:tei_xml'):
- info['grobid0:tei_xml'] = info['grobid0:tei_xml'].decode('utf-8')
-
- #self.increment_counter('lines', 'success')
-
- grobid_status_code = info.get('grobid0:status_code', None)
- if extraction_status is not None:
- return info, dict(status="partial", key=key,
- grobid_status_code=grobid_status_code,
- reason=extraction_status['reason'])
- else:
- return info, dict(status="success",
- grobid_status_code=grobid_status_code, key=key,
- extra=extraction_status)
-
- def run(self):
-
- # 1. start consumer (in managed/balanced fashion, with consumer group)
- # 2. for each thingie, do the work; if success publish to kafka; either
- # way... print? log?
- # 3. repeat!
-
- print("Starting Kafka GROBID extraction worker...")
- kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="2.0.0")
- produce_topic = kafka.topics[self.produce_topic]
- consume_topic = kafka.topics[self.consume_topic]
-
- sequential_failures = 0
- # Configure producer to basically *immediately* publish messages,
- # one-at-a-time, but asynchronously (don't block). Fetch and GROBID
- # process takes a while, and we want to send as soon as processing is
- # done.
- with produce_topic.get_producer(sync=False,
- compression=pykafka.common.CompressionType.GZIP,
- retry_backoff_ms=250,
- max_queued_messages=50,
- min_queued_messages=10,
- linger_ms=5000,
- max_request_size=self.produce_max_request_size) as producer:
- print("Producing to: {}".format(self.produce_topic))
- consumer = consume_topic.get_balanced_consumer(
- consumer_group=self.consumer_group,
- managed=True,
- auto_commit_enable=True,
- auto_commit_interval_ms=30000, # 30 seconds
- # LATEST because best to miss processing than waste time re-process
- auto_offset_reset=pykafka.common.OffsetType.LATEST,
- queued_max_messages=50,
- compacted_topic=True)
- print("Consuming from: {} as {}".format(self.consume_topic, self.consumer_group))
- sys.stdout.flush()
- for msg in consumer:
- grobid_output, status = self.do_work(msg.value.decode('utf-8'))
- if grobid_output:
- print("extracted {}: {}".format(
- grobid_output.get('key'),
- status))
- sys.stdout.flush()
- producer.produce(json.dumps(grobid_output, sort_keys=True).encode('utf-8'))
- sequential_failures = 0
- else:
- print("failed to extract: {}".format(status), file=sys.stderr)
- sequential_failures += 1
- if sequential_failures > 20:
- print("too many failures in a row, bailing out", file=sys.stderr)
- sys.exit(-1)
-
-
-@sentry_client.capture_exceptions
-def main():
-
- parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
- parser.add_argument('--kafka-hosts',
- default="localhost:9092",
- help="list of Kafka brokers (host/port) to use")
- parser.add_argument('--kafka-env',
- default="qa",
- help="eg, 'qa' or 'prod'")
- parser.add_argument('--consume-topic',
- default=None,
- help="Kafka topic to consume from")
- parser.add_argument('--produce-topic',
- default=None,
- help="Kafka topic to produce to")
- parser.add_argument('--grobid-uri',
- type=str,
- default='http://localhost:8070',
- help='URI of GROBID API Server')
- parser.add_argument('--warc-uri-prefix',
- type=str,
- default='https://archive.org/serve/',
- help='URI where WARCs can be found')
- args = parser.parse_args()
-
- if args.consume_topic is None:
- args.consume_topic = "sandcrawler-{}.ungrobided".format(args.kafka_env)
- if args.produce_topic is None:
- args.produce_topic = "sandcrawler-{}.grobid-output".format(args.kafka_env)
-
- worker = KafkaGrobidWorker(**args.__dict__)
- worker.run()
-
-if __name__ == '__main__': # pragma: no cover
- main()