aboutsummaryrefslogtreecommitdiffstats
path: root/mapreduce/extraction_cdx_grobid.py
diff options
context:
space:
mode:
Diffstat (limited to 'mapreduce/extraction_cdx_grobid.py')
-rwxr-xr-xmapreduce/extraction_cdx_grobid.py16
1 files changed, 12 insertions, 4 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py
index db36cac..c29b27e 100755
--- a/mapreduce/extraction_cdx_grobid.py
+++ b/mapreduce/extraction_cdx_grobid.py
@@ -189,8 +189,8 @@ class MRExtractCdxGrobid(MRJob):
key = info['key']
# Check if we've already processed this line
- oldrow = self.hb_table.row(key, columns=[b'f', b'file',
- b'grobid0:status_code'])
+ oldrow = self.hb_table.row(key,
+ columns=[b'f:c', b'file', b'grobid0:status_code'])
if oldrow.get(b'grobid0:status_code', None) != None:
# This file has already been processed; skip it
self.increment_counter('lines', 'existing')
@@ -204,10 +204,11 @@ class MRExtractCdxGrobid(MRJob):
status['key'] = key
yield _, status
return
+ extraction_status = status
# Decide what to bother inserting back into HBase
# Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx')
- grobid_status = info.get('grobid0:status_code', None)
+ grobid_status_code = info.get('grobid0:status_code', None)
for k in list(info.keys()):
if k.encode('utf-8') in oldrow:
info.pop(k)
@@ -229,7 +230,14 @@ class MRExtractCdxGrobid(MRJob):
self.hb_table.put(key, info)
self.increment_counter('lines', 'success')
- yield _, dict(status="success", grobid_status=grobid_status, key=key)
+ if extraction_status is not None:
+ yield _, dict(status="partial", key=key,
+ grobid_status_code=grobid_status_code,
+ reason=extraction_status['reason'])
+ else:
+ yield _, dict(status="success",
+ grobid_status_code=grobid_status_code, key=key,
+ extra=extraction_status)
if __name__ == '__main__': # pragma: no cover