aboutsummaryrefslogtreecommitdiffstats
path: root/mapreduce/extraction_cdx_grobid.py
diff options
context:
space:
mode:
Diffstat (limited to 'mapreduce/extraction_cdx_grobid.py')
-rwxr-xr-xmapreduce/extraction_cdx_grobid.py56
1 files changed, 10 insertions, 46 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py
index 27668ea..c102a59 100755
--- a/mapreduce/extraction_cdx_grobid.py
+++ b/mapreduce/extraction_cdx_grobid.py
@@ -24,49 +24,10 @@ from wayback.resource import Resource
from wayback.resource import ArcResource
from wayback.resourcestore import ResourceStore
from gwb.loader import CDXLoaderFactory
+from common import parse_cdx_line
-def parse_cdx_line(raw_cdx):
-
- cdx = raw_cdx.split()
- if len(cdx) < 11:
- return None
-
- surt = cdx[0]
- dt = cdx[1]
- url = cdx[2]
- mime = normalize_mime(cdx[3])
- http_status = cdx[4]
- key = cdx[5]
- c_size = cdx[8]
- offset = cdx[9]
- warc = cdx[10]
-
- if not (key.isalnum() and c_size.isdigit() and offset.isdigit()
- and http_status == "200" and len(key) == 32 and dt.isdigit()):
- return None
-
- if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc):
- return None
-
- key = "sha1:{}".format(key)
-
- info = dict(surt=surt, dt=dt, url=url, c_size=int(c_size),
- offset=int(offset), warc=warc)
-
- warc_file = warc.split('/')[-1]
- dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat()
- try:
- dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat()
- except:
- return None
-
- # 'i' intentionally not set
- heritrix = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1)
- return {'key': key, 'file:mime': mime, 'file:cdx': info, 'f:c': heritrix}
-
-
-class MRExtractCdxGrobid(MrJob):
+class MRExtractCdxGrobid(MRJob):
# CDX lines in; JSON status out
INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
@@ -204,21 +165,24 @@ class MRExtractCdxGrobid(MrJob):
info, status = self.parse_line(raw_cdx)
if info is None:
self.increment_counter('lines', status['status'])
- return _, status
+ yield _, status
+ return
# Check if we've already processed this line
- oldrow = self.hb_table.get(info['key'], columns=['f', 'file',
+ oldrow = self.hb_table.row(info['key'], columns=['f', 'file',
'grobid:status_code'])
- if row.get('grobid0:status', None):
+ if oldrow.get('grobid0:status', None):
# This file has already been processed; skip it
self.increment_counter('lines', 'existing')
- return _, dict(status="existing")
+ yield _, dict(status="existing")
+ return
# Do the extraction
info, status = self.extract(info)
if info is None:
self.increment_counter('lines', status['status'])
- return _, status
+ yield _, status
+ return
# Decide what to bother inserting back into HBase
# Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx')