aboutsummaryrefslogtreecommitdiffstats
path: root/extraction/extraction_cdx_grobid.py
diff options
context:
space:
mode:
Diffstat (limited to 'extraction/extraction_cdx_grobid.py')
-rwxr-xr-xextraction/extraction_cdx_grobid.py248
1 files changed, 0 insertions, 248 deletions
diff --git a/extraction/extraction_cdx_grobid.py b/extraction/extraction_cdx_grobid.py
deleted file mode 100755
index 27668ea..0000000
--- a/extraction/extraction_cdx_grobid.py
+++ /dev/null
@@ -1,248 +0,0 @@
-#!/usr/bin/env python3
-"""
-Streaming Hadoop script to import extract metadata and body from fulltext (eg,
-PDF) files using GROBID. Input is a CDX file; results primarly go to HBase,
-with status written to configurable output stream.
-
-Fulltext files are loaded directly from WARC files in petabox, instead of going
-through the wayback replay.
-
-Requires:
-- happybase
-- mrjob
-- wayback/GWB libraries
-"""
-
-import io
-import sys
-import struct
-import requests
-import happybase
-import mrjob
-from mrjob.job import MRJob
-from wayback.resource import Resource
-from wayback.resource import ArcResource
-from wayback.resourcestore import ResourceStore
-from gwb.loader import CDXLoaderFactory
-
-
-def parse_cdx_line(raw_cdx):
-
- cdx = raw_cdx.split()
- if len(cdx) < 11:
- return None
-
- surt = cdx[0]
- dt = cdx[1]
- url = cdx[2]
- mime = normalize_mime(cdx[3])
- http_status = cdx[4]
- key = cdx[5]
- c_size = cdx[8]
- offset = cdx[9]
- warc = cdx[10]
-
- if not (key.isalnum() and c_size.isdigit() and offset.isdigit()
- and http_status == "200" and len(key) == 32 and dt.isdigit()):
- return None
-
- if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc):
- return None
-
- key = "sha1:{}".format(key)
-
- info = dict(surt=surt, dt=dt, url=url, c_size=int(c_size),
- offset=int(offset), warc=warc)
-
- warc_file = warc.split('/')[-1]
- dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat()
- try:
- dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat()
- except:
- return None
-
- # 'i' intentionally not set
- heritrix = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1)
- return {'key': key, 'file:mime': mime, 'file:cdx': info, 'f:c': heritrix}
-
-
-class MRExtractCdxGrobid(MrJob):
-
- # CDX lines in; JSON status out
- INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
- OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol
-
- def configure_args(self):
- super(MRExtractCdxGrobid, self).configure_args()
-
- self.add_passthru_arg('--hbase-table',
- type=str,
- default='wbgrp-journal-extract-0-qa',
- help='HBase table to backfill into (must exist)')
- self.add_passthru_arg('--hbase-host',
- type=str,
- default='localhost',
- help='HBase thrift API host to connect to')
- self.add_passthru_arg('--grobid-uri',
- type=str,
- default='http://localhost:8070',
- help='URI of GROBID API Server')
-
- def __init__(self, *args, **kwargs):
-
- # Allow passthrough for tests
- if 'hb_table' in kwargs:
- self.hb_table = kwargs.pop('hb_table')
- else:
- self.hb_table = None
-
- super(MRExtractCdxGrobid, self).__init__(*args, **kwargs)
- self.mime_filter = ['application/pdf']
-
- def grobid_fulltext(self, content):
- r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument",
- files={'input': content})
- if r.status_code is not 200:
- # XXX:
- print("FAIL (Grobid: {}): {}".format(r.content.decode('utf8')))
- else:
- # XXX:
- print("SUCCESS: " + debug_line)
- return r.json()
-
- def mapper_init(self):
-
- if self.hb_table is None:
- try:
- host = self.options.hbase_host
- # TODO: make these configs accessible from... mrconf.cfg?
- hb_conn = happybase.Connection(host=host, transport="framed",
- protocol="compact")
- except Exception as err:
- raise Exception("Couldn't connect to HBase using host: {}".format(host))
- self.hb_table = hb_conn.table(self.options.hbase_table)
-
- def parse_line(self, raw_cdx):
-
- if (raw_cdx.startswith(' ') or raw_cdx.startswith('filedesc') or
- raw_cdx.startswith('#')):
- return None, dict(status="invalid", reason="line prefix")
-
- info = parse_cdx_line(raw_cdx)
- if info is None:
- return None, dict(status="invalid", reason="CDX parse")
-
- if info['file:mime'] not in self.mime_filter:
- self.increment_counter('lines', 'skip')
- return None, dict(status="skip", reason="mimetype")
-
- # If warc is not item/file.(w)arc.gz form, skip it
- if len(info['file:cdx']['warc'].split('/')) != 2:
- self.increment_counter('lines', 'skip')
- return None, dict(status="skip", reason="WARC path not petabox item/file")
-
- return info, None
-
- def extract(self, info):
-
- # Fetch data from WARCs in petabox
- try:
- rstore = ResourceStore(loaderfactory=CDXLoaderFactory())
- gwb_record = rstore.load_resource(
- info['file:cdx']['warc'],
- info['file:cdx']['offset'],
- info['file:cdx']['c_size'])
- except IOError as ioe:
- # XXX: catch correct error
- self.increment_counter('lines', 'existing')
- return _, dict(status="existing")
-
- if gwb_record.get_status()[0] != 200:
- self.increment_counter('lines', 'error')
- return _, dict(status="error", reason="non-HTTP-200 WARC content")
-
- # Submit to GROBID
- content = gwb_record.open_raw_content()
- try:
- grobid_result = self.grobid_fulltext(gwb_record.open_raw_content())
- except IOError as ioe:
- # XXX: catch correct error
- self.increment_counter('lines', 'existing')
- return _, dict(status="existing")
-
- info['file:size'] = len(resource_data)
-
- info['grobid0:status_code'] = None
- info['grobid0:quality'] = None
- info['grobid0:status'] = {}
- info['grobid0:tei_xml'] = None
- info['grobid0:tei_json'] = {}
- info['grobid0:metadata'] = {}
-
- # Convert TEI XML to JSON
- # TODO
-
- # Determine extraction "quality"
- # TODO
-
- return info, None
-
- def mapper(self, _, raw_cdx):
- """
- 1. parse CDX line
- 2. check what is already in hbase
- 3. fetch data from wayback
- 4. submit to GROBID
- 5. convert GROBID response to JSON (and metadata)
- 6. determine "quality"
- 7. push results to hbase
- """
-
- self.increment_counter('lines', 'total')
-
- # Parse line and filter down
- info, status = self.parse_line(raw_cdx)
- if info is None:
- self.increment_counter('lines', status['status'])
- return _, status
-
- # Check if we've already processed this line
- oldrow = self.hb_table.get(info['key'], columns=['f', 'file',
- 'grobid:status_code'])
- if row.get('grobid0:status', None):
- # This file has already been processed; skip it
- self.increment_counter('lines', 'existing')
- return _, dict(status="existing")
-
- # Do the extraction
- info, status = self.extract(info)
- if info is None:
- self.increment_counter('lines', status['status'])
- return _, status
-
- # Decide what to bother inserting back into HBase
- # Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx')
- grobid_status = info.get('grobid0:status_code', None)
- for k in info.keys():
- if k in oldrow:
- info.pop(k)
-
- # Convert fields to binary
- for k in info.keys():
- if k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',
- 'grobid0:metadata'):
- assert type(info[k]) == dict
- info[k] = json.dumps(info[k], sort_keys=True, indent=None)
- if k in ('file:size', 'grobid0:status_code'):
- # encode as int64 in network byte order
- info[k] = struct.pack('!q', info[k])
-
- key = info.pop('key')
- self.hb_table.put(key, info)
- self.increment_counter('lines', 'success')
-
- yield _, dict(status="success", grobid_status=grobid_status)
-
-if __name__ == '__main__':
- MRExtractCdxGrobid.run()
-