aboutsummaryrefslogtreecommitdiffstats
path: root/backfill/backfill_hbase_from_cdx.py
diff options
context:
space:
mode:
Diffstat (limited to 'backfill/backfill_hbase_from_cdx.py')
-rwxr-xr-xbackfill/backfill_hbase_from_cdx.py126
1 files changed, 62 insertions, 64 deletions
diff --git a/backfill/backfill_hbase_from_cdx.py b/backfill/backfill_hbase_from_cdx.py
index 757794a..92a6d32 100755
--- a/backfill/backfill_hbase_from_cdx.py
+++ b/backfill/backfill_hbase_from_cdx.py
@@ -6,6 +6,7 @@ formats.
Requires:
- happybase
+- mrjob
TODO:
- argparse
@@ -18,6 +19,8 @@ TODO:
import sys
import json
import happybase
+import mrjob
+from mrjob.job import MRJob
NORMAL_MIME = (
'application/pdf',
@@ -90,80 +93,75 @@ def test_transform_line():
assert transform_line(raw + " extra_field") == correct
-def run(in_lines, out_lines, status_lines, table, mime_filter=None):
+class MRCDXBackfillHBase(MRJob):
- if mime_filter is None:
- mime_filter = ['application/pdf']
- count_skip = count_invalid = count_fail = count_success = 0
+ # CDX lines in
+ INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
+ OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol
+
+ def configure_args(self):
+ super(MRCDXBackfillHBase, self).configure_args()
+
+ self.add_passthru_arg('--hbase-table',
+ type=str,
+ default='wbgrp-journal-extract-0-qa',
+ help='HBase table to backfill into (must exist)')
+ self.add_passthru_arg('--hbase-host',
+ type=str,
+ default='localhost',
+ help='HBase thrift API host to connect to')
+
+ def __init__(self, *args, **kwargs):
+
+ # Allow passthrough for tests
+ if 'hb_table' in kwargs:
+ self.hb_table = kwargs.pop('hb_table')
+ else:
+ self.hb_table = None
+
+ super(MRCDXBackfillHBase, self).__init__(*args, **kwargs)
+
+ def mapper_init(self):
+
+ if self.hb_table is None:
+ try:
+ host = self.options.hbase_host
+ hb_conn = happybase.Connection(host=host)
+ except Exception as err:
+ raise Exception("Couldn't connect to HBase using host: {}".format(host))
+ self.hb_table = hb_conn.table(self.options.hbase_table)
+
+ self.mime_filter = ['application/pdf']
+
+ def mapper(self, _, raw_cdx):
+
+ self.increment_counter('lines', 'total')
- for raw_cdx in in_lines.readlines():
if (raw_cdx.startswith(' ') or raw_cdx.startswith('filedesc') or
raw_cdx.startswith('#')):
+
# Skip line
- count_skip += 1
- continue
+ self.increment_counter('lines', 'invalid')
+ return _, status
info = transform_line(raw_cdx)
if info is None:
- count_invalid += 1
- continue
- if info['file:mime'] not in mime_filter:
- count_skip += 1
- continue
+ self.increment_counter('lines', 'invalid')
+ return
+
+ if info['file:mime'] not in self.mime_filter:
+ self.increment_counter('lines', 'skip')
+ return
key = info.pop('key')
info['file:cdx'] = json.dumps(info['file:cdx'], sort_keys=True,
indent=None)
- try:
- table.put(key, info)
- count_success += 1
- except:
- status_lines.write("ERROR\n") # TODO:
- count_fail += 1
-
- status_lines.write('\n')
- status_lines.write('skip\t{}\n'.format(count_skip))
- status_lines.write('invalid\t{}\n'.format(count_invalid))
- status_lines.write('fail\t{}\n'.format(count_fail))
- status_lines.write('success\t{}\n'.format(count_success))
-
-
-def test_run():
-
- import io
- import happybase_mock
-
- out = io.StringIO()
- status = io.StringIO()
- raw = io.StringIO("""
-com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 301 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz
-eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=761393014319a39f40d32ae3eb3a853f?sequence=1 20170705062202 http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1 application/PDF 200 MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J - - 854156 328850624 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz
-com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robots.txt text/plain 200 6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD - - 638 398190140 CITESEERX-CRAWL-2017-06-20-20170705062707827-00049-00058-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705063158203-00053-31209~wbgrp-svc284.us.archive.org~8443.warc.gz
-""")
-
- conn = happybase_mock.Connection()
- conn.create_table('wbgrp-journal-extract-test', {'file': {}, 'grobid0': {}})
-
- table = conn.table('wbgrp-journal-extract-test')
- run(raw, out, status, table)
-
- print(status.getvalue())
-
- assert table.row(b'1') == {}
- # HTTP 301
- assert table.row(b'3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ') == {}
- # valid
- assert table.row(b'MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J') != {}
- # text/plain
- assert table.row(b'6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD') == {}
-
- row = table.row(b'MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J')
- assert row[b'file:mime'] == b"application/pdf"
- json.loads(row[b'file:cdx'].decode('utf-8'))
-
-if __name__=="__main__":
- hb = happybase.Connection(host='')
- with hb.connection() as conn:
- table = conn.table('wbgrp-journal-extract-0-qa')
- run(sys.stdin, sys.stdout, sys.stderr, table)
+
+ self.hb_table.put(key, info)
+ self.increment_counter('lines', 'success')
+
+ yield _, dict(status="success")
+
+if __name__ == '__main__':
+ MRCDXBackfillHBase.run()