diff options
Diffstat (limited to 'extraction/backfill_hbase_from_cdx.py')
| -rwxr-xr-x | extraction/backfill_hbase_from_cdx.py | 195 | 
1 files changed, 0 insertions, 195 deletions
| diff --git a/extraction/backfill_hbase_from_cdx.py b/extraction/backfill_hbase_from_cdx.py deleted file mode 100755 index fe37bd5..0000000 --- a/extraction/backfill_hbase_from_cdx.py +++ /dev/null @@ -1,195 +0,0 @@ -#!/usr/bin/env python3 -""" -Streaming Hadoop script to import CDX metadata into the HBase fulltext table, -primarily for URL-agnostic crawl de-duplication. Takes only "fulltext" file -formats. - -Requires: -- happybase -- mrjob - -TODO: -- argparse -- refactor into an object -- tests in separate file -- nose tests -- sentry integration for error reporting -""" - -import sys -import json -from datetime import datetime -import happybase -import mrjob -from mrjob.job import MRJob - -NORMAL_MIME = ( -    'application/pdf', -    'application/postscript', -    'text/html', -    'text/xml', -) - -def normalize_mime(raw): -    raw = raw.lower() -    for norm in NORMAL_MIME: -        if raw.startswith(norm): -            return norm - -    # Special cases  -    if raw.startswith('application/xml'): -        return 'text/xml' -    if raw.startswith('application/x-pdf'): -        return 'application/pdf' -    return None - -def test_normalize_mime(): -    assert normalize_mime("asdf") == None -    assert normalize_mime("application/pdf") == "application/pdf" -    assert normalize_mime("application/pdf+journal") == "application/pdf" -    assert normalize_mime("Application/PDF") == "application/pdf" -    assert normalize_mime("application/p") == None -    assert normalize_mime("application/xml+stuff") == "text/xml" - -def transform_line(raw_cdx): - -    cdx = raw_cdx.split() -    if len(cdx) < 11: -        return None - -    surt = cdx[0] -    dt = cdx[1] -    url = cdx[2] -    mime = normalize_mime(cdx[3]) -    http_status = cdx[4] -    key = cdx[5] -    c_size = cdx[8] -    offset = cdx[9] -    warc = cdx[10] - -    if not (key.isalnum() and c_size.isdigit() and offset.isdigit() -            and http_status == "200" and len(key) == 32 and dt.isdigit()): -        return None - -    if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc): -        return None - -    key = "sha1:{}".format(key) - -    info = dict(surt=surt, dt=dt, url=url, c_size=int(c_size), -        offset=int(offset), warc=warc) - -    warc_file = warc.split('/')[-1] -    dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() -    try: -        dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() -    except: -        return None - -    # 'i' intentionally not set -    heritrix = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) -    return {'key': key, 'file:mime': mime, 'file:cdx': info, 'f:c': heritrix} - -def test_transform_line(): - -    raw = "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz" -    correct = { -        'key': "sha1:WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G", -        'file:mime': "application/pdf", -        'file:cdx': { -            'surt': "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf", -            'url': "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf", -            'dt': "20170828233154", -            'warc': "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", -            'offset': 931661233, -            'c_size': 210251, -        }, -        'f:c': { -            'u': "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf", -            'd': "2017-08-28T23:31:54", -            'f': "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", -            'o': 931661233, -            'c': 1, -        } -    } - -    assert transform_line(raw) == correct -    assert transform_line(raw + "\n") == correct -    assert transform_line(raw + " extra_field") == correct - - -class MRCDXBackfillHBase(MRJob): - -    # CDX lines in; JSON status out -    INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol -    OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol - -    def configure_args(self): -        super(MRCDXBackfillHBase, self).configure_args() - -        self.add_passthru_arg('--hbase-table', -                              type=str, -                              default='wbgrp-journal-extract-0-qa', -                              help='HBase table to backfill into (must exist)') -        self.add_passthru_arg('--hbase-host', -                              type=str, -                              default='localhost', -                              help='HBase thrift API host to connect to') - -    def __init__(self, *args, **kwargs): - -        # Allow passthrough for tests -        if 'hb_table' in kwargs: -            self.hb_table = kwargs.pop('hb_table') -        else: -            self.hb_table = None - -        super(MRCDXBackfillHBase, self).__init__(*args, **kwargs) -        self.mime_filter = ['application/pdf'] - -    def mapper_init(self): - -        if self.hb_table is None: -            try: -                host = self.options.hbase_host -                # TODO: make these configs accessible from... mrconf.cfg? -                hb_conn = happybase.Connection(host=host, transport="framed", -                    protocol="compact") -            except Exception as err: -                raise Exception("Couldn't connect to HBase using host: {}".format(host)) -            self.hb_table = hb_conn.table(self.options.hbase_table) - -    def mapper(self, _, raw_cdx): - -        self.increment_counter('lines', 'total') - -        if (raw_cdx.startswith(' ') or raw_cdx.startswith('filedesc') or -                raw_cdx.startswith('#')): - -            # Skip line -            # XXX: tests don't cover this path; need coverage! -            self.increment_counter('lines', 'invalid') -            return _, dict(status="invalid") - -        info = transform_line(raw_cdx) -        if info is None: -            self.increment_counter('lines', 'invalid') -            return _, dict(status="invalid") - -        if info['file:mime'] not in self.mime_filter: -            self.increment_counter('lines', 'skip') -            return _, dict(status="skip") - -        key = info.pop('key') -        info['f:c'] = json.dumps(info['f:c'], sort_keys=True, indent=None) -        info['file:cdx'] = json.dumps(info['file:cdx'], sort_keys=True, -            indent=None) - -        self.hb_table.put(key, info) -        self.increment_counter('lines', 'success') - -        yield _, dict(status="success") - -if __name__ == '__main__': -    MRCDXBackfillHBase.run() - | 
