aboutsummaryrefslogtreecommitdiffstats
path: root/python_hadoop/backfill_hbase_from_cdx.py
diff options
context:
space:
mode:
Diffstat (limited to 'python_hadoop/backfill_hbase_from_cdx.py')
-rwxr-xr-xpython_hadoop/backfill_hbase_from_cdx.py88
1 files changed, 88 insertions, 0 deletions
diff --git a/python_hadoop/backfill_hbase_from_cdx.py b/python_hadoop/backfill_hbase_from_cdx.py
new file mode 100755
index 0000000..6b2ec0b
--- /dev/null
+++ b/python_hadoop/backfill_hbase_from_cdx.py
@@ -0,0 +1,88 @@
+#!/usr/bin/env python3
+"""
+Streaming Hadoop script to import CDX metadata into the HBase fulltext table,
+primarily for URL-agnostic crawl de-duplication. Takes only "fulltext" file
+formats.
+
+Requires:
+- happybase
+- mrjob
+"""
+
+import json
+import happybase
+import mrjob
+from mrjob.job import MRJob
+from common import parse_cdx_line
+
+
+class MRCDXBackfillHBase(MRJob):
+
+ # CDX lines in; JSON status out
+ INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
+ OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol
+
+ def configure_args(self):
+ super(MRCDXBackfillHBase, self).configure_args()
+
+ self.add_passthru_arg('--hbase-table',
+ type=str,
+ default='wbgrp-journal-extract-0-qa',
+ help='HBase table to backfill into (must exist)')
+ self.add_passthru_arg('--hbase-host',
+ type=str,
+ default='localhost',
+ help='HBase thrift API host to connect to')
+
+ def __init__(self, *args, **kwargs):
+ super(MRCDXBackfillHBase, self).__init__(*args, **kwargs)
+ self.mime_filter = ['application/pdf']
+ self.hb_table = None
+
+ def mapper_init(self):
+
+ if self.hb_table:
+ return
+
+ try:
+ host = self.options.hbase_host
+ # TODO: make these configs accessible from... mrconf.cfg?
+ hb_conn = happybase.Connection(host=host, transport="framed",
+ protocol="compact")
+ except Exception:
+ raise Exception("Couldn't connect to HBase using host: {}".format(host))
+ self.hb_table = hb_conn.table(self.options.hbase_table)
+
+ def mapper(self, _, raw_cdx):
+
+ self.increment_counter('lines', 'total')
+
+ if (raw_cdx.startswith(' ') or raw_cdx.startswith('filedesc') or
+ raw_cdx.startswith('#')):
+ self.increment_counter('lines', 'invalid')
+ yield _, dict(status="invalid", reason="line prefix")
+ return
+
+ info = parse_cdx_line(raw_cdx)
+ if info is None:
+ self.increment_counter('lines', 'invalid')
+ yield _, dict(status="invalid")
+ return
+
+ if info['file:mime'] not in self.mime_filter:
+ self.increment_counter('lines', 'skip')
+ yield _, dict(status="skip", reason="unwanted mimetype")
+ return
+
+ key = info.pop('key')
+ info['f:c'] = json.dumps(info['f:c'], sort_keys=True, indent=None)
+ info['file:cdx'] = json.dumps(info['file:cdx'],
+ sort_keys=True, indent=None)
+
+ self.hb_table.put(key, info)
+ self.increment_counter('lines', 'success')
+
+ yield _, dict(status="success")
+
+if __name__ == '__main__': # pragma: no cover
+ MRCDXBackfillHBase.run()