aboutsummaryrefslogtreecommitdiffstats
path: root/extraction/extraction_cdx_grobid.py
diff options
context:
space:
mode:
Diffstat (limited to 'extraction/extraction_cdx_grobid.py')
-rwxr-xr-xextraction/extraction_cdx_grobid.py248
1 files changed, 248 insertions, 0 deletions
diff --git a/extraction/extraction_cdx_grobid.py b/extraction/extraction_cdx_grobid.py
new file mode 100755
index 0000000..54d8b71
--- /dev/null
+++ b/extraction/extraction_cdx_grobid.py
@@ -0,0 +1,248 @@
+#!/usr/bin/env python3
+"""
+Streaming Hadoop script to import extract metadata and body from fulltext (eg,
+PDF) files using GROBID. Input is a CDX file; results primarly go to HBase,
+with status written to configurable output stream.
+
+Fulltext files are loaded directly from WARC files in petabox, instead of going
+through the wayback replay.
+
+Requires:
+- happybase
+- mrjob
+- wayback/GWB libraries
+"""
+
+import io
+import sys
+import struct
+import requests
+import happybase
+import mrjob
+from mrjob.job import MRJob
+from wayback.resource import Resource
+from wayback.resource import ArcResource
+from wayback.resourcestore import ResourceStore
+from gwb.loader import CDXLoaderFactory
+
+
+def parse_cdx_line(raw_cdx):
+
+ cdx = raw_cdx.split()
+ if len(cdx) < 11:
+ return None
+
+ surt = cdx[0]
+ dt = cdx[1]
+ url = cdx[2]
+ mime = normalize_mime(cdx[3])
+ http_status = cdx[4]
+ key = cdx[5]
+ c_size = cdx[8]
+ offset = cdx[9]
+ warc = cdx[10]
+
+ if not (key.isalnum() and c_size.isdigit() and offset.isdigit()
+ and http_status == "200" and len(key) == 32 and dt.isdigit()):
+ return None
+
+ if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc):
+ return None
+
+ key = "sha1:{}".format(key)
+
+ info = dict(surt=surt, dt=dt, url=url, c_size=int(c_size),
+ offset=int(offset), warc=warc)
+
+ warc_file = warc.split('/')[-1]
+ dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat()
+ try:
+ dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat()
+ except:
+ return None
+
+ # 'i' intentionally not set
+ heritrix = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1)
+ return {'key': key, 'file:mime': mime, 'file:cdx': info, 'f:c': heritrix}
+
+
+class MRExtractCdxGrobid(MrJob):
+
+ # CDX lines in; JSON status out
+ INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
+ OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol
+
+ def configure_args(self):
+ super(MRExtractCdxGrobid, self).configure_args()
+
+ self.add_passthru_arg('--hbase-table',
+ type=str,
+ default='wbgrp-journal-extract-0-qa',
+ help='HBase table to backfill into (must exist)')
+ self.add_passthru_arg('--hbase-host',
+ type=str,
+ default='localhost',
+ help='HBase thrift API host to connect to')
+ self.add_passthru_arg('--grobid-uri',
+ type=str,
+ default='http://localhost:8070',
+ help='URI of GROBID API Server')
+
+ def __init__(self, *args, **kwargs):
+
+ # Allow passthrough for tests
+ if 'hb_table' in kwargs:
+ self.hb_table = kwargs.pop('hb_table')
+ else:
+ self.hb_table = None
+
+ super(MRExtractCdxGrobid, self).__init__(*args, **kwargs)
+ self.mime_filter = ['application/pdf']
+
+ def grobid_fulltext(self, content):
+ r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument",
+ files={'input': content})
+ if r.status_code is not 200:
+ # XXX:
+ print("FAIL (Grobid: {}): {}".format(r.content.decode('utf8')))
+ else:
+ # XXX:
+ print("SUCCESS: " + debug_line)
+ return r.json()
+
+ def mapper_init(self):
+
+ if self.hb_table is None:
+ try:
+ host = self.options.hbase_host
+ # TODO: make these configs accessible from... mrconf.cfg?
+ hb_conn = happybase.Connection(host=host, transport="framed",
+ protocol="compact")
+ except Exception as err:
+ raise Exception("Couldn't connect to HBase using host: {}".format(host))
+ self.hb_table = hb_conn.table(self.options.hbase_table)
+
+ def parse_line(self, raw_cdx):
+
+ if (raw_cdx.startswith(' ') or raw_cdx.startswith('filedesc') or
+ raw_cdx.startswith('#')):
+ return None, dict(status="invalid", reason="line prefix")
+
+ info = parse_cdx_line(raw_cdx)
+ if info is None:
+ return None, dict(status="invalid", reason="CDX parse")
+
+ if info['file:mime'] not in self.mime_filter:
+ self.increment_counter('lines', 'skip')
+ return None, dict(status="skip", reason="mimetype")
+
+ # If warc is not item/file.(w)arc.gz form, skip it
+ if len(info['file:cdx']['warc'].split('/')) != 2:
+ self.increment_counter('lines', 'skip')
+ return None, dict(status="skip", reason="WARC path not petabox item/file")
+
+ return info, None
+
+ def extract(self, info):
+
+ # Fetch data from WARCs in petabox
+ try:
+ rstore = ResourceStore(loaderfactory=CDXLoaderFactory())
+ gwb_record = rstore.load_resource(
+ info['file:cdx']['warc'],
+ info['file:cdx']['offset'],
+ info['file:cdx']['c_size'])
+ except IOError as ioe:
+ # XXX: catch correct error
+ self.increment_counter('lines', 'existing')
+ return _, dict(status="existing")
+
+ if gwb_record.get_status()[0] != 200:
+ self.increment_counter('lines', 'error')
+ return _, dict(status="error", reason="non-HTTP-200 WARC content")
+
+ # Submit to GROBID
+ content = gwb_record.open_raw_content()
+ try:
+ grobid_result = self.grobid_fulltext(gwb_record.open_raw_content())
+ except IOError as ioe:
+ # XXX: catch correct error
+ self.increment_counter('lines', 'existing')
+ return _, dict(status="existing")
+
+ info['file:size'] = len(resource_data)
+
+ info['grobid0:status_code'] = None
+ info['grobid0:quality'] = None
+ info['grobid0:status'] = {}
+ info['grobid0:tei_xml'] = None
+ info['grobid0:tei_json'] = {}
+ info['grobid0:metadata'] = {}
+
+ # Convert TEI XML to JSON
+ # TODO
+
+ # Determine extraction "quality"
+ # TODO
+
+ return info, None
+
+ def mapper(self, _, raw_cdx):
+ """
+ 1. parse CDX line
+ 2. check what is already in hbase
+ 3. fetch data from wayback
+ 4. submit to GROBID
+ 5. convert GROBID response to JSON (and metadata)
+ 6. determine "quality"
+ 7. push results to hbase
+ """
+
+ self.increment_counter('lines', 'total')
+
+ # Parse line and filter down
+ info, status = self.parse_line(raw_cdx)
+ if info is None:
+ self.increment_counter('lines', status['status'])
+ return _, status
+
+ # Check if we've already processed this line
+ oldrow = self.hb_table.get(info['key'], columns=['f', 'file',
+ 'grobid:status_code'])
+ if row.get('grobid0:status', None):
+ # This file has already been processed; skip it
+ self.increment_counter('lines', 'existing')
+ return _, dict(status="existing")
+
+ # Do the extraction
+ info, status = self.extract(info)
+ if info is None:
+ self.increment_counter('lines', status['status'])
+ return _, status
+
+ # Decide what to bother inserting back into HBase
+ # Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx')
+ grobid_status = info.get('grobid0:status_code', None)
+ for k in info.keys():
+ if k in oldrow:
+ info.pop(k)
+
+ # Convert fields to binary
+ for k in info.keys():
+ if k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',
+ 'grobid0:metadata'):
+ assert type(info[k]) == dict
+ info[k] = json.dumps(info[k], sort_keys=True, indent=None)
+ if k in ('file:size', 'grobid0:status_code'):
+ # encode as int64 in network byte order
+ info[k] = struct.pack('!q', info[k])
+
+ key = info.pop('key')
+ self.hb_table.put(key, info)
+ self.increment_counter('lines', 'success')
+
+ yield _, dict(status="success", grobid_status=grobid_status)
+
+if __name__ == '__main__':
+ MRExtractCdxGrobid.run()
+