aboutsummaryrefslogtreecommitdiffstats
path: root/mapreduce/backfill_hbase_from_cdx.py
diff options
context:
space:
mode:
Diffstat (limited to 'mapreduce/backfill_hbase_from_cdx.py')
-rwxr-xr-xmapreduce/backfill_hbase_from_cdx.py88
1 files changed, 0 insertions, 88 deletions
diff --git a/mapreduce/backfill_hbase_from_cdx.py b/mapreduce/backfill_hbase_from_cdx.py
deleted file mode 100755
index 6b2ec0b..0000000
--- a/mapreduce/backfill_hbase_from_cdx.py
+++ /dev/null
@@ -1,88 +0,0 @@
-#!/usr/bin/env python3
-"""
-Streaming Hadoop script to import CDX metadata into the HBase fulltext table,
-primarily for URL-agnostic crawl de-duplication. Takes only "fulltext" file
-formats.
-
-Requires:
-- happybase
-- mrjob
-"""
-
-import json
-import happybase
-import mrjob
-from mrjob.job import MRJob
-from common import parse_cdx_line
-
-
-class MRCDXBackfillHBase(MRJob):
-
- # CDX lines in; JSON status out
- INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
- OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol
-
- def configure_args(self):
- super(MRCDXBackfillHBase, self).configure_args()
-
- self.add_passthru_arg('--hbase-table',
- type=str,
- default='wbgrp-journal-extract-0-qa',
- help='HBase table to backfill into (must exist)')
- self.add_passthru_arg('--hbase-host',
- type=str,
- default='localhost',
- help='HBase thrift API host to connect to')
-
- def __init__(self, *args, **kwargs):
- super(MRCDXBackfillHBase, self).__init__(*args, **kwargs)
- self.mime_filter = ['application/pdf']
- self.hb_table = None
-
- def mapper_init(self):
-
- if self.hb_table:
- return
-
- try:
- host = self.options.hbase_host
- # TODO: make these configs accessible from... mrconf.cfg?
- hb_conn = happybase.Connection(host=host, transport="framed",
- protocol="compact")
- except Exception:
- raise Exception("Couldn't connect to HBase using host: {}".format(host))
- self.hb_table = hb_conn.table(self.options.hbase_table)
-
- def mapper(self, _, raw_cdx):
-
- self.increment_counter('lines', 'total')
-
- if (raw_cdx.startswith(' ') or raw_cdx.startswith('filedesc') or
- raw_cdx.startswith('#')):
- self.increment_counter('lines', 'invalid')
- yield _, dict(status="invalid", reason="line prefix")
- return
-
- info = parse_cdx_line(raw_cdx)
- if info is None:
- self.increment_counter('lines', 'invalid')
- yield _, dict(status="invalid")
- return
-
- if info['file:mime'] not in self.mime_filter:
- self.increment_counter('lines', 'skip')
- yield _, dict(status="skip", reason="unwanted mimetype")
- return
-
- key = info.pop('key')
- info['f:c'] = json.dumps(info['f:c'], sort_keys=True, indent=None)
- info['file:cdx'] = json.dumps(info['file:cdx'],
- sort_keys=True, indent=None)
-
- self.hb_table.put(key, info)
- self.increment_counter('lines', 'success')
-
- yield _, dict(status="success")
-
-if __name__ == '__main__': # pragma: no cover
- MRCDXBackfillHBase.run()