path: root/python_hadoop/extraction_cdx_grobid.py
diff options
Diffstat (limited to 'python_hadoop/extraction_cdx_grobid.py')
1 files changed, 299 insertions, 0 deletions
diff --git a/python_hadoop/extraction_cdx_grobid.py b/python_hadoop/extraction_cdx_grobid.py
new file mode 100755
index 0000000..88580e1
--- /dev/null
+++ b/python_hadoop/extraction_cdx_grobid.py
@@ -0,0 +1,299 @@
+#!/usr/bin/env python3
+Streaming Hadoop script to import extract metadata and body from fulltext (eg,
+PDF) files using GROBID. Input is a CDX file; results primarly go to HBase,
+with status written to configurable output stream.
+Fulltext files are loaded directly from WARC files in petabox, instead of going
+through the wayback replay.
+- happybase
+- mrjob
+- wayback/GWB libraries
+# XXX: some broken MRO thing going on in here due to python3 object wrangling
+# in `wayback` library. Means we can't run pylint.
+# pylint: skip-file
+import os
+import xml
+import json
+import raven
+import struct
+import requests
+import happybase
+import mrjob
+from mrjob.job import MRJob
+import wayback.exception
+from http.client import IncompleteRead
+from wayback.resourcestore import ResourceStore
+from gwb.loader import CDXLoaderFactory
+from common import parse_cdx_line
+from grobid2json import teixml2json
+# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable
+sentry_client = raven.Client()
+# Specific poison-pill rows we should skip
+ 'sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT', # "failed to guess ARC header format"
+class MRExtractCdxGrobid(MRJob):
+ # CDX lines in; JSON status out
+ #HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.NLineInputFormat'
+ #INPUT_PROTOCOL = mrjob.protocol.RawProtocol
+ INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
+ OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol
+ def configure_args(self):
+ super(MRExtractCdxGrobid, self).configure_args()
+ self.add_passthru_arg('--hbase-table',
+ type=str,
+ default='wbgrp-journal-extract-0-qa',
+ help='HBase table to backfill into (must exist)')
+ self.add_passthru_arg('--hbase-host',
+ type=str,
+ default='localhost',
+ help='HBase thrift API host to connect to')
+ self.add_passthru_arg('--grobid-uri',
+ type=str,
+ default='http://localhost:8070',
+ help='URI of GROBID API Server')
+ self.add_passthru_arg('--warc-uri-prefix',
+ type=str,
+ default='https://archive.org/serve/',
+ help='URI where WARCs can be found')
+ self.add_passthru_arg('--force-existing',
+ action="store_true",
+ help='Re-processes (with GROBID) existing lines')
+ def __init__(self, *args, **kwargs):
+ super(MRExtractCdxGrobid, self).__init__(*args, **kwargs)
+ self.hb_table = None
+ self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET'))
+ self.mime_filter = ['application/pdf']
+ self.rstore = None
+ def grobid_process_fulltext(self, content):
+ r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument",
+ files={'input': content})
+ return r
+ def mapper_init(self):
+ if self.hb_table:
+ return
+ sentry_client.tags_context(dict(hbase_table=self.options.hbase_table))
+ try:
+ host = self.options.hbase_host
+ # TODO: make these configs accessible from... mrconf.cfg?
+ hb_conn = happybase.Connection(host=host, transport="framed",
+ protocol="compact")
+ except Exception:
+ raise Exception("Couldn't connect to HBase using host: {}".format(host))
+ self.hb_table = hb_conn.table(self.options.hbase_table)
+ def parse_line(self, raw_cdx):
+ if (raw_cdx.startswith(' ') or raw_cdx.startswith('filedesc') or
+ raw_cdx.startswith('#')):
+ return None, dict(status="invalid", reason="line prefix")
+ info = parse_cdx_line(raw_cdx)
+ if info is None:
+ return None, dict(status="invalid", reason="CDX parse")
+ if info['file:mime'] not in self.mime_filter:
+ return None, dict(status="skip", reason="mimetype")
+ # If warc is not item/file.(w)arc.gz form, skip it
+ if len(info['file:cdx']['warc'].split('/')) != 2:
+ return None, dict(status="skip", reason="WARC path not petabox item/file")
+ return info, None
+ def fetch_warc_content(self, warc_path, offset, c_size):
+ warc_uri = self.options.warc_uri_prefix + warc_path
+ if not self.rstore:
+ self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory(
+ webdata_secret=self.petabox_webdata_secret,
+ download_base_url=self.options.warc_uri_prefix))
+ try:
+ gwb_record = self.rstore.load_resource(warc_uri, offset, c_size)
+ except wayback.exception.ResourceUnavailable:
+ return None, dict(status="error",
+ reason="failed to load file contents from wayback/petabox (ResourceUnavailable)")
+ except ValueError as ve:
+ return None, dict(status="error",
+ reason="failed to load file contents from wayback/petabox (ValueError: {})".format(ve))
+ except EOFError as eofe:
+ return None, dict(status="error",
+ reason="failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
+ except TypeError as te:
+ return None, dict(status="error",
+ reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te))
+ # Note: could consider a generic "except Exception" here, as we get so
+ # many petabox errors. Do want jobs to fail loud and clear when the
+ # whole cluster is down though.
+ if gwb_record.get_status()[0] != 200:
+ return None, dict(status="error",
+ reason="archived HTTP response (WARC) was not 200",
+ warc_status=gwb_record.get_status()[0])
+ try:
+ raw_content = gwb_record.open_raw_content().read()
+ except IncompleteRead as ire:
+ return None, dict(status="error",
+ reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire))
+ return raw_content, None
+ def extract(self, info):
+ # Fetch data from WARCs in petabox
+ original_content, status = self.fetch_warc_content(
+ info['file:cdx']['warc'],
+ info['file:cdx']['offset'],
+ info['file:cdx']['c_size'])
+ if status:
+ return None, status
+ info['file:size'] = len(original_content)
+ # Submit to GROBID
+ try:
+ grobid_response = self.grobid_process_fulltext(original_content)
+ except requests.exceptions.ConnectionError:
+ return None, dict(status="error", reason="connection to GROBID worker")
+ info['grobid0:status_code'] = grobid_response.status_code
+ # 4 MByte XML size limit; don't record GROBID status on this path
+ if len(grobid_response.content) > 4000000:
+ info['grobid0:status'] = {'status': 'oversize'}
+ return info, dict(status="oversize", reason="TEI response was too large")
+ if grobid_response.status_code != 200:
+ # response.text is .content decoded as utf-8
+ info['grobid0:status'] = dict(status='error', description=grobid_response.text)
+ return info, dict(status="error", reason="non-200 GROBID HTTP status",
+ extra=grobid_response.text)
+ info['grobid0:status'] = {'status': 'partial'}
+ info['grobid0:tei_xml'] = grobid_response.content
+ # Convert TEI XML to JSON
+ try:
+ info['grobid0:tei_json'] = teixml2json(info['grobid0:tei_xml'], encumbered=True)
+ except xml.etree.ElementTree.ParseError:
+ info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML parse error")
+ return info, info['grobid0:status']
+ except ValueError:
+ info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML non-TEI content")
+ return info, info['grobid0:status']
+ tei_metadata = info['grobid0:tei_json'].copy()
+ for k in ('body', 'annex'):
+ # Remove fulltext (copywritted) content
+ tei_metadata.pop(k, None)
+ info['grobid0:metadata'] = tei_metadata
+ # Determine extraction "quality"
+ # TODO:
+ info['grobid0:quality'] = None
+ info['grobid0:status'] = {'status': 'success'}
+ return info, None
+ @sentry_client.capture_exceptions
+ def mapper(self, _, raw_cdx):
+ """
+ 1. parse CDX line
+ 2. check what is already in hbase
+ 3. fetch data from wayback
+ 4. submit to GROBID
+ 5. convert GROBID response to JSON (and metadata)
+ 6. determine "quality"
+ 7. push results to hbase
+ """
+ self.increment_counter('lines', 'total')
+ # Parse line and filter down
+ info, status = self.parse_line(raw_cdx)
+ if info is None:
+ self.increment_counter('lines', status['status'])
+ yield _, status
+ return
+ key = info['key']
+ if key in KEY_DENYLIST:
+ self.increment_counter('lines', 'denylist')
+ yield _, dict(status='denylist', key=key)
+ return
+ # Note: this may not get "cleared" correctly
+ sentry_client.extra_context(dict(row_key=key))
+ # Check if we've already processed this line
+ oldrow = self.hb_table.row(key,
+ columns=[b'f:c', b'file', b'grobid0:status_code'])
+ if (oldrow.get(b'grobid0:status_code', None) != None
+ and not self.options.force_existing):
+ # This file has already been processed; skip it
+ self.increment_counter('lines', 'existing')
+ yield _, dict(status="existing", key=key)
+ return
+ # Do the extraction
+ info, status = self.extract(info)
+ if info is None:
+ self.increment_counter('lines', status['status'])
+ status['key'] = key
+ yield _, status
+ return
+ extraction_status = status
+ # Decide what to bother inserting back into HBase
+ # Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx')
+ grobid_status_code = info.get('grobid0:status_code', None)
+ for k in list(info.keys()):
+ if k.encode('utf-8') in oldrow:
+ info.pop(k)
+ # Convert fields to binary
+ for k in list(info.keys()):
+ if info[k] is None:
+ info.pop(k)
+ elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',
+ 'grobid0:metadata'):
+ assert type(info[k]) == dict
+ info[k] = json.dumps(info[k], sort_keys=True, indent=None)
+ elif k in ('file:size', 'grobid0:status_code'):
+ # encode as int64 in network byte order
+ if info[k] != {} and info[k] != None:
+ info[k] = struct.pack('!q', info[k])
+ key = info.pop('key')
+ self.hb_table.put(key, info)
+ self.increment_counter('lines', 'success')
+ if extraction_status is not None:
+ yield _, dict(status="partial", key=key,
+ grobid_status_code=grobid_status_code,
+ reason=extraction_status['reason'])
+ else:
+ yield _, dict(status="success",
+ grobid_status_code=grobid_status_code, key=key,
+ extra=extraction_status)
+if __name__ == '__main__': # pragma: no cover
+ MRExtractCdxGrobid.run()