aboutsummaryrefslogtreecommitdiffstats
path: root/mapreduce
diff options
context:
space:
mode:
Diffstat (limited to 'mapreduce')
-rwxr-xr-xmapreduce/backfill_hbase_from_cdx.py28
-rwxr-xr-xmapreduce/extraction_cdx_grobid.py28
-rw-r--r--mapreduce/tests/test_backfill_hbase_from_cdx.py5
-rw-r--r--mapreduce/tests/test_extraction_cdx_grobid.py6
4 files changed, 31 insertions, 36 deletions
diff --git a/mapreduce/backfill_hbase_from_cdx.py b/mapreduce/backfill_hbase_from_cdx.py
index 57c18e4..2643195 100755
--- a/mapreduce/backfill_hbase_from_cdx.py
+++ b/mapreduce/backfill_hbase_from_cdx.py
@@ -43,27 +43,23 @@ class MRCDXBackfillHBase(MRJob):
help='HBase thrift API host to connect to')
def __init__(self, *args, **kwargs):
-
- # Allow passthrough for tests
- if 'hb_table' in kwargs:
- self.hb_table = kwargs.pop('hb_table')
- else:
- self.hb_table = None
-
super(MRCDXBackfillHBase, self).__init__(*args, **kwargs)
self.mime_filter = ['application/pdf']
+ self.hb_table = None
def mapper_init(self):
- if self.hb_table is None:
- try:
- host = self.options.hbase_host
- # TODO: make these configs accessible from... mrconf.cfg?
- hb_conn = happybase.Connection(host=host, transport="framed",
- protocol="compact")
- except Exception as err:
- raise Exception("Couldn't connect to HBase using host: {}".format(host))
- self.hb_table = hb_conn.table(self.options.hbase_table)
+ if self.hb_table:
+ return
+
+ try:
+ host = self.options.hbase_host
+ # TODO: make these configs accessible from... mrconf.cfg?
+ hb_conn = happybase.Connection(host=host, transport="framed",
+ protocol="compact")
+ except Exception as err:
+ raise Exception("Couldn't connect to HBase using host: {}".format(host))
+ self.hb_table = hb_conn.table(self.options.hbase_table)
def mapper(self, _, raw_cdx):
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py
index 0ba95e6..a4a13f8 100755
--- a/mapreduce/extraction_cdx_grobid.py
+++ b/mapreduce/extraction_cdx_grobid.py
@@ -53,15 +53,9 @@ class MRExtractCdxGrobid(MRJob):
help='URI of GROBID API Server')
def __init__(self, *args, **kwargs):
-
- # Allow passthrough for tests
- if 'hb_table' in kwargs:
- self.hb_table = kwargs.pop('hb_table')
- else:
- self.hb_table = None
-
super(MRExtractCdxGrobid, self).__init__(*args, **kwargs)
self.mime_filter = ['application/pdf']
+ self.hb_table = None
def grobid_process_fulltext(self, content):
r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument",
@@ -73,15 +67,17 @@ class MRExtractCdxGrobid(MRJob):
def mapper_init(self):
- if self.hb_table is None:
- try:
- host = self.options.hbase_host
- # TODO: make these configs accessible from... mrconf.cfg?
- hb_conn = happybase.Connection(host=host, transport="framed",
- protocol="compact")
- except Exception as err:
- raise Exception("Couldn't connect to HBase using host: {}".format(host))
- self.hb_table = hb_conn.table(self.options.hbase_table)
+ if self.hb_table:
+ return
+
+ try:
+ host = self.options.hbase_host
+ # TODO: make these configs accessible from... mrconf.cfg?
+ hb_conn = happybase.Connection(host=host, transport="framed",
+ protocol="compact")
+ except Exception as err:
+ raise Exception("Couldn't connect to HBase using host: {}".format(host))
+ self.hb_table = hb_conn.table(self.options.hbase_table)
def parse_line(self, raw_cdx):
diff --git a/mapreduce/tests/test_backfill_hbase_from_cdx.py b/mapreduce/tests/test_backfill_hbase_from_cdx.py
index 2dbbc25..070662b 100644
--- a/mapreduce/tests/test_backfill_hbase_from_cdx.py
+++ b/mapreduce/tests/test_backfill_hbase_from_cdx.py
@@ -15,12 +15,13 @@ def job():
Note: this mock only seems to work with job.run_mapper(), not job.run();
the later results in a separate instantiation without the mock?
"""
+ job = MRCDXBackfillHBase(['--no-conf', '-'])
+
conn = happybase_mock.Connection()
conn.create_table('wbgrp-journal-extract-test',
{'file': {}, 'grobid0': {}, 'f': {}})
- table = conn.table('wbgrp-journal-extract-test')
+ job.hb_table = conn.table('wbgrp-journal-extract-test')
- job = MRCDXBackfillHBase(['--no-conf', '-'], hb_table=table)
return job
diff --git a/mapreduce/tests/test_extraction_cdx_grobid.py b/mapreduce/tests/test_extraction_cdx_grobid.py
index 46a89aa..02d2b41 100644
--- a/mapreduce/tests/test_extraction_cdx_grobid.py
+++ b/mapreduce/tests/test_extraction_cdx_grobid.py
@@ -18,14 +18,16 @@ def job():
Note: this mock only seems to work with job.run_mapper(), not job.run();
the later results in a separate instantiation without the mock?
"""
+ job = MRExtractCdxGrobid(['--no-conf', '-'])
+
conn = happybase_mock.Connection()
conn.create_table('wbgrp-journal-extract-test',
{'file': {}, 'grobid0': {}, 'f': {}})
- table = conn.table('wbgrp-journal-extract-test')
+ job.hb_table = conn.table('wbgrp-journal-extract-test')
- job = MRExtractCdxGrobid(['--no-conf', '-'], hb_table=table)
return job
+
@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
@responses.activate
def test_mapper_lines(mock_fetch, job):