diff options
Diffstat (limited to 'mapreduce/backfill_hbase_from_cdx.py')
-rwxr-xr-x | mapreduce/backfill_hbase_from_cdx.py | 98 |
1 files changed, 2 insertions, 96 deletions
diff --git a/mapreduce/backfill_hbase_from_cdx.py b/mapreduce/backfill_hbase_from_cdx.py index fe37bd5..8a28ec1 100755 --- a/mapreduce/backfill_hbase_from_cdx.py +++ b/mapreduce/backfill_hbase_from_cdx.py @@ -18,104 +18,10 @@ TODO: import sys import json -from datetime import datetime import happybase import mrjob from mrjob.job import MRJob - -NORMAL_MIME = ( - 'application/pdf', - 'application/postscript', - 'text/html', - 'text/xml', -) - -def normalize_mime(raw): - raw = raw.lower() - for norm in NORMAL_MIME: - if raw.startswith(norm): - return norm - - # Special cases - if raw.startswith('application/xml'): - return 'text/xml' - if raw.startswith('application/x-pdf'): - return 'application/pdf' - return None - -def test_normalize_mime(): - assert normalize_mime("asdf") == None - assert normalize_mime("application/pdf") == "application/pdf" - assert normalize_mime("application/pdf+journal") == "application/pdf" - assert normalize_mime("Application/PDF") == "application/pdf" - assert normalize_mime("application/p") == None - assert normalize_mime("application/xml+stuff") == "text/xml" - -def transform_line(raw_cdx): - - cdx = raw_cdx.split() - if len(cdx) < 11: - return None - - surt = cdx[0] - dt = cdx[1] - url = cdx[2] - mime = normalize_mime(cdx[3]) - http_status = cdx[4] - key = cdx[5] - c_size = cdx[8] - offset = cdx[9] - warc = cdx[10] - - if not (key.isalnum() and c_size.isdigit() and offset.isdigit() - and http_status == "200" and len(key) == 32 and dt.isdigit()): - return None - - if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc): - return None - - key = "sha1:{}".format(key) - - info = dict(surt=surt, dt=dt, url=url, c_size=int(c_size), - offset=int(offset), warc=warc) - - warc_file = warc.split('/')[-1] - dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() - try: - dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() - except: - return None - - # 'i' intentionally not set - heritrix = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) - return {'key': key, 'file:mime': mime, 'file:cdx': info, 'f:c': heritrix} - -def test_transform_line(): - - raw = "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz" - correct = { - 'key': "sha1:WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G", - 'file:mime': "application/pdf", - 'file:cdx': { - 'surt': "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf", - 'url': "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf", - 'dt': "20170828233154", - 'warc': "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", - 'offset': 931661233, - 'c_size': 210251, - }, - 'f:c': { - 'u': "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf", - 'd': "2017-08-28T23:31:54", - 'f': "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", - 'o': 931661233, - 'c': 1, - } - } - - assert transform_line(raw) == correct - assert transform_line(raw + "\n") == correct - assert transform_line(raw + " extra_field") == correct +from common import parse_cdx_line class MRCDXBackfillHBase(MRJob): @@ -171,7 +77,7 @@ class MRCDXBackfillHBase(MRJob): self.increment_counter('lines', 'invalid') return _, dict(status="invalid") - info = transform_line(raw_cdx) + info = parse_cdx_line(raw_cdx) if info is None: self.increment_counter('lines', 'invalid') return _, dict(status="invalid") |