aboutsummaryrefslogtreecommitdiffstats
path: root/python/extraction_ungrobided.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2018-08-24 18:40:39 -0700
committerBryan Newbold <bnewbold@archive.org>2018-08-24 18:40:39 -0700
commitc2dbb9b5299ba7cfc4e2328ce3a4ef6c2882dc9e (patch)
tree06b267ec0883b6f4e5bdb41413e953192ff07b12 /python/extraction_ungrobided.py
parent1f989c851247115784d5bc877341f1e8d7ff5f98 (diff)
downloadsandcrawler-c2dbb9b5299ba7cfc4e2328ce3a4ef6c2882dc9e.tar.gz
sandcrawler-c2dbb9b5299ba7cfc4e2328ce3a4ef6c2882dc9e.zip
python extraction_ungrobided job
Diffstat (limited to 'python/extraction_ungrobided.py')
-rwxr-xr-xpython/extraction_ungrobided.py136
1 files changed, 136 insertions, 0 deletions
diff --git a/python/extraction_ungrobided.py b/python/extraction_ungrobided.py
new file mode 100755
index 0000000..aedf715
--- /dev/null
+++ b/python/extraction_ungrobided.py
@@ -0,0 +1,136 @@
+#!/usr/bin/env python3
+"""
+Variant of extraction_cdx_grobid which takes a partial metadata list as input
+instead of CDX.
+
+This task list is dumped by another Hadoop job which scans over the HBase table
+quickly, which allows this job to skip a (relatively) expensive HBase read
+per-row.
+
+Requires:
+- happybase
+- mrjob
+- wayback/GWB libraries
+"""
+
+# XXX: some broken MRO thing going on in here due to python3 object wrangling
+# in `wayback` library. Means we can't run pylint.
+# pylint: skip-file
+
+import xml
+import json
+import raven
+import struct
+import mrjob
+from common import parse_ungrobided_line
+from extraction_cdx_grobid import MRExtractCdxGrobid, KEY_BLACKLIST, \
+ sentry_client
+
+
+class MRExtractUnGrobided(MRExtractCdxGrobid):
+
+ # CDX lines in; JSON status out
+ #HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.NLineInputFormat'
+ #INPUT_PROTOCOL = mrjob.protocol.RawProtocol
+ INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
+ OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol
+
+ def parse_line(self, raw_line):
+ """Line should be TSV and have non-null fields:
+
+ - key (string)
+ - f:c (string, json)
+ - file:mime (string)
+ - file:cdx (string, json)
+ """
+
+ if (raw_line.startswith(' ') or raw_line.startswith('#')):
+ return None, dict(status="invalid", reason="line prefix", input=raw_line)
+
+ info = parse_ungrobided_line(raw_line)
+ if info is None:
+ return None, dict(status="invalid", reason="ungrobided parse")
+
+ if info['file:mime'] not in self.mime_filter:
+ return None, dict(status="skip", reason="mimetype", mimetype=info['file:mime'])
+
+ # If warc is not item/file.(w)arc.gz form, skip it
+ if len(info['file:cdx']['warc'].split('/')) != 2:
+ return None, dict(status="skip", reason="WARC path not petabox item/file", path=info['file:cdx']['warc'])
+
+ return info, None
+
+ @sentry_client.capture_exceptions
+ def mapper(self, _, raw_line):
+ """
+ 1. parse filtered line
+ 2. fetch data from wayback
+ 3. submit to GROBID
+ 4. convert GROBID response to JSON (and metadata)
+ 6. determine "quality"
+ 6. push results to hbase
+ """
+
+ self.increment_counter('lines', 'total')
+
+ # Parse line and filter down
+ info, status = self.parse_line(raw_line)
+ if info is None:
+ self.increment_counter('lines', status['status'])
+ yield _, status
+ return
+ key = info['key']
+ if key in KEY_BLACKLIST:
+ self.increment_counter('lines', 'blacklist')
+ yield _, dict(status='blacklist', key=key)
+ return
+
+ # Note: this may not get "cleared" correctly
+ sentry_client.extra_context(dict(row_key=key))
+
+ # Do the extraction
+ info, status = self.extract(info)
+ if info is None:
+ self.increment_counter('lines', status['status'])
+ status['key'] = key
+ yield _, status
+ return
+ extraction_status = status
+
+ # Decide what to bother inserting back into HBase
+ # Basically, don't overwrite backfill fields.
+ grobid_status_code = info.get('grobid0:status_code', None)
+ for k in list(info.keys()):
+ if k.encode('utf-8') in ('f:c', 'file:mime', 'file:cdx'):
+ info.pop(k)
+
+ # Convert fields to binary
+ for k in list(info.keys()):
+ if info[k] is None:
+ info.pop(k)
+ # NOTE: we're not actually sending these f:*, file:* keys...
+ elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',
+ 'grobid0:metadata'):
+ assert type(info[k]) == dict
+ info[k] = json.dumps(info[k], sort_keys=True, indent=None)
+ elif k in ('file:size', 'grobid0:status_code'):
+ # encode as int64 in network byte order
+ if info[k] != {} and info[k] != None:
+ info[k] = struct.pack('!q', info[k])
+
+ key = info.pop('key')
+ self.hb_table.put(key, info)
+ self.increment_counter('lines', 'success')
+
+ if extraction_status is not None:
+ yield _, dict(status="partial", key=key,
+ grobid_status_code=grobid_status_code,
+ reason=extraction_status['reason'])
+ else:
+ yield _, dict(status="success",
+ grobid_status_code=grobid_status_code, key=key,
+ extra=extraction_status)
+
+
+if __name__ == '__main__': # pragma: no cover
+ MRExtractCdxGrobid.run()