diff options
Diffstat (limited to 'mapreduce/extraction_cdx_grobid.py')
-rwxr-xr-x | mapreduce/extraction_cdx_grobid.py | 56 |
1 files changed, 10 insertions, 46 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py index 27668ea..c102a59 100755 --- a/mapreduce/extraction_cdx_grobid.py +++ b/mapreduce/extraction_cdx_grobid.py @@ -24,49 +24,10 @@ from wayback.resource import Resource from wayback.resource import ArcResource from wayback.resourcestore import ResourceStore from gwb.loader import CDXLoaderFactory +from common import parse_cdx_line -def parse_cdx_line(raw_cdx): - - cdx = raw_cdx.split() - if len(cdx) < 11: - return None - - surt = cdx[0] - dt = cdx[1] - url = cdx[2] - mime = normalize_mime(cdx[3]) - http_status = cdx[4] - key = cdx[5] - c_size = cdx[8] - offset = cdx[9] - warc = cdx[10] - - if not (key.isalnum() and c_size.isdigit() and offset.isdigit() - and http_status == "200" and len(key) == 32 and dt.isdigit()): - return None - - if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc): - return None - - key = "sha1:{}".format(key) - - info = dict(surt=surt, dt=dt, url=url, c_size=int(c_size), - offset=int(offset), warc=warc) - - warc_file = warc.split('/')[-1] - dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() - try: - dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() - except: - return None - - # 'i' intentionally not set - heritrix = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) - return {'key': key, 'file:mime': mime, 'file:cdx': info, 'f:c': heritrix} - - -class MRExtractCdxGrobid(MrJob): +class MRExtractCdxGrobid(MRJob): # CDX lines in; JSON status out INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol @@ -204,21 +165,24 @@ class MRExtractCdxGrobid(MrJob): info, status = self.parse_line(raw_cdx) if info is None: self.increment_counter('lines', status['status']) - return _, status + yield _, status + return # Check if we've already processed this line - oldrow = self.hb_table.get(info['key'], columns=['f', 'file', + oldrow = self.hb_table.row(info['key'], columns=['f', 'file', 'grobid:status_code']) - if row.get('grobid0:status', None): + if oldrow.get('grobid0:status', None): # This file has already been processed; skip it self.increment_counter('lines', 'existing') - return _, dict(status="existing") + yield _, dict(status="existing") + return # Do the extraction info, status = self.extract(info) if info is None: self.increment_counter('lines', status['status']) - return _, status + yield _, status + return # Decide what to bother inserting back into HBase # Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx') |