diff options
Diffstat (limited to 'mapreduce/backfill_hbase_from_cdx.py')
-rwxr-xr-x | mapreduce/backfill_hbase_from_cdx.py | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/mapreduce/backfill_hbase_from_cdx.py b/mapreduce/backfill_hbase_from_cdx.py new file mode 100755 index 0000000..fe37bd5 --- /dev/null +++ b/mapreduce/backfill_hbase_from_cdx.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python3 +""" +Streaming Hadoop script to import CDX metadata into the HBase fulltext table, +primarily for URL-agnostic crawl de-duplication. Takes only "fulltext" file +formats. + +Requires: +- happybase +- mrjob + +TODO: +- argparse +- refactor into an object +- tests in separate file +- nose tests +- sentry integration for error reporting +""" + +import sys +import json +from datetime import datetime +import happybase +import mrjob +from mrjob.job import MRJob + +NORMAL_MIME = ( + 'application/pdf', + 'application/postscript', + 'text/html', + 'text/xml', +) + +def normalize_mime(raw): + raw = raw.lower() + for norm in NORMAL_MIME: + if raw.startswith(norm): + return norm + + # Special cases + if raw.startswith('application/xml'): + return 'text/xml' + if raw.startswith('application/x-pdf'): + return 'application/pdf' + return None + +def test_normalize_mime(): + assert normalize_mime("asdf") == None + assert normalize_mime("application/pdf") == "application/pdf" + assert normalize_mime("application/pdf+journal") == "application/pdf" + assert normalize_mime("Application/PDF") == "application/pdf" + assert normalize_mime("application/p") == None + assert normalize_mime("application/xml+stuff") == "text/xml" + +def transform_line(raw_cdx): + + cdx = raw_cdx.split() + if len(cdx) < 11: + return None + + surt = cdx[0] + dt = cdx[1] + url = cdx[2] + mime = normalize_mime(cdx[3]) + http_status = cdx[4] + key = cdx[5] + c_size = cdx[8] + offset = cdx[9] + warc = cdx[10] + + if not (key.isalnum() and c_size.isdigit() and offset.isdigit() + and http_status == "200" and len(key) == 32 and dt.isdigit()): + return None + + if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc): + return None + + key = "sha1:{}".format(key) + + info = dict(surt=surt, dt=dt, url=url, c_size=int(c_size), + offset=int(offset), warc=warc) + + warc_file = warc.split('/')[-1] + dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() + try: + dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() + except: + return None + + # 'i' intentionally not set + heritrix = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) + return {'key': key, 'file:mime': mime, 'file:cdx': info, 'f:c': heritrix} + +def test_transform_line(): + + raw = "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz" + correct = { + 'key': "sha1:WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G", + 'file:mime': "application/pdf", + 'file:cdx': { + 'surt': "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf", + 'url': "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf", + 'dt': "20170828233154", + 'warc': "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + 'offset': 931661233, + 'c_size': 210251, + }, + 'f:c': { + 'u': "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf", + 'd': "2017-08-28T23:31:54", + 'f': "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + 'o': 931661233, + 'c': 1, + } + } + + assert transform_line(raw) == correct + assert transform_line(raw + "\n") == correct + assert transform_line(raw + " extra_field") == correct + + +class MRCDXBackfillHBase(MRJob): + + # CDX lines in; JSON status out + INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol + OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol + + def configure_args(self): + super(MRCDXBackfillHBase, self).configure_args() + + self.add_passthru_arg('--hbase-table', + type=str, + default='wbgrp-journal-extract-0-qa', + help='HBase table to backfill into (must exist)') + self.add_passthru_arg('--hbase-host', + type=str, + default='localhost', + help='HBase thrift API host to connect to') + + def __init__(self, *args, **kwargs): + + # Allow passthrough for tests + if 'hb_table' in kwargs: + self.hb_table = kwargs.pop('hb_table') + else: + self.hb_table = None + + super(MRCDXBackfillHBase, self).__init__(*args, **kwargs) + self.mime_filter = ['application/pdf'] + + def mapper_init(self): + + if self.hb_table is None: + try: + host = self.options.hbase_host + # TODO: make these configs accessible from... mrconf.cfg? + hb_conn = happybase.Connection(host=host, transport="framed", + protocol="compact") + except Exception as err: + raise Exception("Couldn't connect to HBase using host: {}".format(host)) + self.hb_table = hb_conn.table(self.options.hbase_table) + + def mapper(self, _, raw_cdx): + + self.increment_counter('lines', 'total') + + if (raw_cdx.startswith(' ') or raw_cdx.startswith('filedesc') or + raw_cdx.startswith('#')): + + # Skip line + # XXX: tests don't cover this path; need coverage! + self.increment_counter('lines', 'invalid') + return _, dict(status="invalid") + + info = transform_line(raw_cdx) + if info is None: + self.increment_counter('lines', 'invalid') + return _, dict(status="invalid") + + if info['file:mime'] not in self.mime_filter: + self.increment_counter('lines', 'skip') + return _, dict(status="skip") + + key = info.pop('key') + info['f:c'] = json.dumps(info['f:c'], sort_keys=True, indent=None) + info['file:cdx'] = json.dumps(info['file:cdx'], sort_keys=True, + indent=None) + + self.hb_table.put(key, info) + self.increment_counter('lines', 'success') + + yield _, dict(status="success") + +if __name__ == '__main__': + MRCDXBackfillHBase.run() + |