diff options
Diffstat (limited to 'backfill/backfill_hbase_from_cdx.py')
-rwxr-xr-x | backfill/backfill_hbase_from_cdx.py | 126 |
1 files changed, 62 insertions, 64 deletions
diff --git a/backfill/backfill_hbase_from_cdx.py b/backfill/backfill_hbase_from_cdx.py index 757794a..92a6d32 100755 --- a/backfill/backfill_hbase_from_cdx.py +++ b/backfill/backfill_hbase_from_cdx.py @@ -6,6 +6,7 @@ formats. Requires: - happybase +- mrjob TODO: - argparse @@ -18,6 +19,8 @@ TODO: import sys import json import happybase +import mrjob +from mrjob.job import MRJob NORMAL_MIME = ( 'application/pdf', @@ -90,80 +93,75 @@ def test_transform_line(): assert transform_line(raw + " extra_field") == correct -def run(in_lines, out_lines, status_lines, table, mime_filter=None): +class MRCDXBackfillHBase(MRJob): - if mime_filter is None: - mime_filter = ['application/pdf'] - count_skip = count_invalid = count_fail = count_success = 0 + # CDX lines in + INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol + OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol + + def configure_args(self): + super(MRCDXBackfillHBase, self).configure_args() + + self.add_passthru_arg('--hbase-table', + type=str, + default='wbgrp-journal-extract-0-qa', + help='HBase table to backfill into (must exist)') + self.add_passthru_arg('--hbase-host', + type=str, + default='localhost', + help='HBase thrift API host to connect to') + + def __init__(self, *args, **kwargs): + + # Allow passthrough for tests + if 'hb_table' in kwargs: + self.hb_table = kwargs.pop('hb_table') + else: + self.hb_table = None + + super(MRCDXBackfillHBase, self).__init__(*args, **kwargs) + + def mapper_init(self): + + if self.hb_table is None: + try: + host = self.options.hbase_host + hb_conn = happybase.Connection(host=host) + except Exception as err: + raise Exception("Couldn't connect to HBase using host: {}".format(host)) + self.hb_table = hb_conn.table(self.options.hbase_table) + + self.mime_filter = ['application/pdf'] + + def mapper(self, _, raw_cdx): + + self.increment_counter('lines', 'total') - for raw_cdx in in_lines.readlines(): if (raw_cdx.startswith(' ') or raw_cdx.startswith('filedesc') or raw_cdx.startswith('#')): + # Skip line - count_skip += 1 - continue + self.increment_counter('lines', 'invalid') + return _, status info = transform_line(raw_cdx) if info is None: - count_invalid += 1 - continue - if info['file:mime'] not in mime_filter: - count_skip += 1 - continue + self.increment_counter('lines', 'invalid') + return + + if info['file:mime'] not in self.mime_filter: + self.increment_counter('lines', 'skip') + return key = info.pop('key') info['file:cdx'] = json.dumps(info['file:cdx'], sort_keys=True, indent=None) - try: - table.put(key, info) - count_success += 1 - except: - status_lines.write("ERROR\n") # TODO: - count_fail += 1 - - status_lines.write('\n') - status_lines.write('skip\t{}\n'.format(count_skip)) - status_lines.write('invalid\t{}\n'.format(count_invalid)) - status_lines.write('fail\t{}\n'.format(count_fail)) - status_lines.write('success\t{}\n'.format(count_success)) - - -def test_run(): - - import io - import happybase_mock - - out = io.StringIO() - status = io.StringIO() - raw = io.StringIO(""" -com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 301 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz -eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=761393014319a39f40d32ae3eb3a853f?sequence=1 20170705062202 http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1 application/PDF 200 MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J - - 854156 328850624 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz -com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robots.txt text/plain 200 6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD - - 638 398190140 CITESEERX-CRAWL-2017-06-20-20170705062707827-00049-00058-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705063158203-00053-31209~wbgrp-svc284.us.archive.org~8443.warc.gz -""") - - conn = happybase_mock.Connection() - conn.create_table('wbgrp-journal-extract-test', {'file': {}, 'grobid0': {}}) - - table = conn.table('wbgrp-journal-extract-test') - run(raw, out, status, table) - - print(status.getvalue()) - - assert table.row(b'1') == {} - # HTTP 301 - assert table.row(b'3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ') == {} - # valid - assert table.row(b'MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J') != {} - # text/plain - assert table.row(b'6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD') == {} - - row = table.row(b'MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J') - assert row[b'file:mime'] == b"application/pdf" - json.loads(row[b'file:cdx'].decode('utf-8')) - -if __name__=="__main__": - hb = happybase.Connection(host='') - with hb.connection() as conn: - table = conn.table('wbgrp-journal-extract-0-qa') - run(sys.stdin, sys.stdout, sys.stderr, table) + + self.hb_table.put(key, info) + self.increment_counter('lines', 'success') + + yield _, dict(status="success") + +if __name__ == '__main__': + MRCDXBackfillHBase.run() |