aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--kafka/grobid_kafka_notes.txt18
-rw-r--r--python/Pipfile1
-rwxr-xr-xpython/kafka_grobid.py295
3 files changed, 314 insertions, 0 deletions
diff --git a/kafka/grobid_kafka_notes.txt b/kafka/grobid_kafka_notes.txt
index f774291..26c450f 100644
--- a/kafka/grobid_kafka_notes.txt
+++ b/kafka/grobid_kafka_notes.txt
@@ -22,3 +22,21 @@ this...
Need to ensure we have compression enabled, for the GROBID output in
particular! Probably worth using "expensive" GZIP compression to get extra disk
savings; latency shouldn't be a big deal here.
+
+## Commands
+
+Load up some example lines, without partition key:
+
+ head -n10 python/tests/files/example_ungrobided.tsv | kafkacat -P -b localhost:9092 -t sandcrawler-qa.ungrobided
+
+Load up some example lines, with partition key:
+
+ head -n10 python/tests/files/example_ungrobided.tsv | awk -F'\t' '{print $1 "\t" $0}' | kafkacat -K$'\t' -P -b localhost:9092 -t sandcrawler-qa.ungrobided
+
+Check ungrobided topic:
+
+ kafkacat -C -b localhost:9092 -t sandcrawler-qa.ungrobided
+
+Check grobid output:
+
+ kafkacat -C -b localhost:9092 -t sandcrawler-qa.grobided
diff --git a/python/Pipfile b/python/Pipfile
index 129b23e..fcc8589 100644
--- a/python/Pipfile
+++ b/python/Pipfile
@@ -25,6 +25,7 @@ requests = "*"
wayback = {version=">=0.2.1.2", index="ia"}
xmltodict = "*"
raven = "*"
+pykafka = "*"
[requires]
python_version = "3.5"
diff --git a/python/kafka_grobid.py b/python/kafka_grobid.py
new file mode 100755
index 0000000..31a3a55
--- /dev/null
+++ b/python/kafka_grobid.py
@@ -0,0 +1,295 @@
+#!/usr/bin/env python3
+"""
+Kafka worker that does GROBID extraction from one queue and into another.
+
+Based on the ungrobided Hadoop job code. Does not talk to HBase at all, just
+petabox and GROBID. Will delegate tasks to random GROBID workers.
+
+Lines (tasks) are enqueued using a trivial kafkacat invocation; output is
+persisted in Kakfa (in compressed format), and also drained into HBase by a
+second worker.
+
+Schema of tasks is the 'ungrobided' TSV output. Schema of output is JSON with
+keys:
+
+ "key": SHA1 in base32 with prefix, eg, "sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT"
+ "grobid0:status_code": HTTP status code (integer)
+ "grobid0:status": dict/json
+ "grobid0:tei_xml": xml as a single string
+ "f:c": dict/json from input
+ "file:mime": string from input
+ "file:cdx": dict/json from input
+ # NOT grobid0:tei_json, grobid0:metadata, or grobid0:quality, which can be
+ # re-derived from tei_xml
+
+Requires:
+- requests
+- pykafka
+- wayback/GWB libraries
+"""
+
+# XXX: some broken MRO thing going on in here due to python3 object wrangling
+# in `wayback` library. Means we can't run pylint.
+# pylint: skip-file
+
+import xml
+import json
+import raven
+import struct
+import requests
+import argparse
+import pykafka
+import wayback.exception
+from wayback.resource import Resource
+from wayback.resource import ArcResource
+from wayback.resourcestore import ResourceStore
+from gwb.loader import CDXLoaderFactory
+
+from common import parse_ungrobided_line
+from grobid2json import teixml2json
+
+# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable
+sentry_client = raven.Client()
+
+# Specific poison-pill rows we should skip
+KEY_DENYLIST = (
+ 'sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT', # "failed to guess ARC header format"
+)
+
+class KafkaGrobidWorker:
+
+ def __init__(self, kafka_hosts, consume_topic, produce_topic, **kwargs):
+ self.consume_topic = consume_topic
+ self.produce_topic = produce_topic
+ self.consumer_group = kwargs.get('consumer_group', 'extraction')
+ self.kafka_hosts = kafka_hosts or 'localhost:9092'
+ self.grobid_uri = kwargs.get('grobid_uri')
+ self.warc_uri_prefix = kwargs.get('warc_uri_prefix')
+ self.mime_filter = ['application/pdf']
+ self.rstore = None
+
+ def grobid_process_fulltext(self, content):
+ r = requests.post(self.grobid_uri + "/api/processFulltextDocument",
+ files={'input': content})
+ return r
+
+ def parse_line(self, raw_line):
+ """Line should be TSV and have non-null fields:
+
+ - key (string) (separated in Kafka case)
+ - f:c (string, json)
+ - file:mime (string)
+ - file:cdx (string, json)
+ """
+
+ if (raw_line.startswith(' ') or raw_line.startswith('#') or raw_line.startswith('\t')):
+ return None, dict(status="invalid", reason="line prefix", input=raw_line)
+
+ info = parse_ungrobided_line(raw_line)
+ if info is None:
+ return None, dict(status="invalid", reason="ungrobided parse")
+
+ if info['file:mime'] not in self.mime_filter:
+ return None, dict(status="skip", reason="mimetype", mimetype=info['file:mime'])
+
+ # If warc is not item/file.(w)arc.gz form, skip it
+ if len(info['file:cdx']['warc'].split('/')) != 2:
+ return None, dict(status="skip", reason="WARC path not petabox item/file", path=info['file:cdx']['warc'])
+
+ return info, None
+
+ def fetch_warc_content(self, warc_path, offset, c_size):
+ warc_uri = self.warc_uri_prefix + warc_path
+ if not self.rstore:
+ self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory())
+ try:
+ gwb_record = self.rstore.load_resource(warc_uri, offset, c_size)
+ except wayback.exception.ResourceUnavailable:
+ return None, dict(status="error",
+ reason="failed to load file contents from wayback/petabox (ResourceUnavailable)")
+ except ValueError as ve:
+ return None, dict(status="error",
+ reason="failed to load file contents from wayback/petabox (ValueError: {})".format(ve))
+ except EOFError as eofe:
+ return None, dict(status="error",
+ reason="failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
+ # Note: could consider a generic "except Exception" here, as we get so
+ # many petabox errors. Do want jobs to fail loud and clear when the
+ # whole cluster is down though.
+
+ if gwb_record.get_status()[0] != 200:
+ return None, dict(status="error",
+ reason="archived HTTP response (WARC) was not 200",
+ warc_status=gwb_record.get_status()[0])
+ return gwb_record.open_raw_content().read(), None
+
+ def extract(self, info):
+
+ # Fetch data from WARCs in petabox
+ original_content, status = self.fetch_warc_content(
+ info['file:cdx']['warc'],
+ info['file:cdx']['offset'],
+ info['file:cdx']['c_size'])
+ if status:
+ return None, status
+
+ info['file:size'] = len(original_content)
+
+ # Submit to GROBID
+ try:
+ grobid_response = self.grobid_process_fulltext(original_content)
+ except requests.exceptions.ConnectionError:
+ return None, dict(status="error", reason="connection to GROBID worker")
+
+ info['grobid0:status_code'] = grobid_response.status_code
+
+ # 4 MByte XML size limit; don't record GROBID status on this path
+ if len(grobid_response.content) > 4000000:
+ info['grobid0:status'] = {'status': 'oversize'}
+ return info, dict(status="oversize", reason="TEI response was too large")
+
+ if grobid_response.status_code != 200:
+ # response.text is .content decoded as utf-8
+ info['grobid0:status'] = dict(status='error', description=grobid_response.text)
+ return info, dict(status="error", reason="non-200 GROBID HTTP status",
+ extra=grobid_response.text)
+
+ info['grobid0:status'] = {'status': 'partial'}
+ info['grobid0:tei_xml'] = grobid_response.content
+ info['grobid0:status'] = {'status': 'success'}
+
+ return info
+
+ def do_work(self, raw_line):
+ """
+ 1. parse filtered line
+ 2. fetch data from wayback
+ 3. submit to GROBID
+ 4. convert GROBID response to JSON (and metadata)
+ 6. determine "quality"
+ 6. produce result to kafka
+
+ Returns: (grobid_output, status) (both are None or dict)
+ If grobid_output is None, error was recovered, status returned.
+ Otherwise, we were successful; grobid_output should be JSON serialized
+ and published to kafka.
+ """
+
+ #self.increment_counter('lines', 'total')
+
+ # Parse line and filter down
+ info, status = self.parse_line(raw_line)
+ if info is None:
+ #self.increment_counter('lines', status['status'])
+ return None, status
+ key = info['key']
+ if key in KEY_DENYLIST:
+ #self.increment_counter('lines', 'denylist')
+ return None, dict(status='denylist', key=key)
+
+ # Note: this may not get "cleared" correctly
+ sentry_client.extra_context(dict(row_key=key))
+
+ # Do the extraction
+ info, status = self.extract(info)
+ if info is None:
+ #self.increment_counter('lines', status['status'])
+ status['key'] = key
+ return None, status
+ extraction_status = status
+
+ # Decide what to bother inserting back into HBase
+ # Basically, don't overwrite backfill fields.
+ grobid_status_code = info.get('grobid0:status_code', None)
+ for k in list(info.keys()):
+ if k.encode('utf-8') in ('f:c', 'file:mime', 'file:cdx'):
+ info.pop(k)
+
+ # Convert fields to binary
+ for k in list(info.keys()):
+ if info[k] is None:
+ info.pop(k)
+ # NOTE: we're not actually sending these f:*, file:* keys...
+ elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',
+ 'grobid0:metadata'):
+ assert type(info[k]) == dict
+ info[k] = json.dumps(info[k], sort_keys=True, indent=None)
+ elif k in ('file:size', 'grobid0:status_code'):
+ # encode as int64 in network byte order
+ if info[k] != {} and info[k] != None:
+ info[k] = struct.pack('!q', info[k])
+
+ #self.increment_counter('lines', 'success')
+
+ if extraction_status is not None:
+ return info, dict(status="partial", key=key,
+ grobid_status_code=grobid_status_code,
+ reason=extraction_status['reason'])
+ else:
+ return info, dict(status="success",
+ grobid_status_code=grobid_status_code, key=key,
+ extra=extraction_status)
+
+ def run(self):
+
+ # 1. start consumer (in managed/balanced fashion, with consumer group)
+ # 2. for each thingie, do the work; if success publish to kafka; either
+ # way... print? log?
+ # 3. repeat!
+
+ kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="1.0.0")
+ produce_topic = kafka.topics[self.produce_topic]
+ consume_topic = kafka.topics[self.consume_topic]
+
+ print("starting up...")
+ sequential_failures = 0
+ with produce_topic.get_producer(compression=pykafka.common.CompressionType.GZIP, sync=False) as producer:
+ consumer = consume_topic.get_balanced_consumer(
+ consumer_group=self.consumer_group,
+ managed=True,
+ fetch_message_max_bytes=10000, # only ~10kbytes at a time
+ auto_commit_enable=True,
+ auto_commit_interval_ms=60000, # 60 seconds
+ compacted_topic=True)
+ for msg in consumer:
+ print("got a line! ")
+ grobid_output, status = self.do_work(msg.value.decode('utf-8'))
+ if grobid_output:
+ producer.produce(json.dumps(work).encode('utf-8'))
+ sequential_failures = 0
+ else:
+ print("failed to extract: {}".format(status))
+ sequential_failures += 1
+ if sequential_failures > 20:
+ print("too many failures in a row, bailing out")
+ sys.exit(-1)
+
+
+@sentry_client.capture_exceptions
+def main():
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--kafka-hosts',
+ default="localhost:9092",
+ help="list of Kafka brokers (host/port) to use")
+ parser.add_argument('--consume-topic',
+ default="sandcrawler-qa.ungrobided",
+ help="Kafka topic to consume from")
+ parser.add_argument('--produce-topic',
+ default="sandcrawler-qa.grobid-output",
+ help="Kafka topic to produce to")
+ parser.add_argument('--grobid-uri',
+ type=str,
+ default='http://localhost:8070',
+ help='URI of GROBID API Server')
+ parser.add_argument('--warc-uri-prefix',
+ type=str,
+ default='https://archive.org/serve/',
+ help='URI where WARCs can be found')
+ args = parser.parse_args()
+
+ worker = KafkaGrobidWorker(**args.__dict__)
+ worker.run()
+
+if __name__ == '__main__': # pragma: no cover
+ main()