aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2018-08-27 14:32:19 -0700
committerBryan Newbold <bnewbold@archive.org>2018-08-27 14:32:19 -0700
commita7156b06340460e0e70a19891e161b8b8f4f2078 (patch)
tree029a04f6558b99f060e845ab1ef00baa6cd39600
parent4c374c647d8fecce827cabcb579e5aae20f198db (diff)
parentd2b4da4c55a24468a0cbfdc9f567449d4e913331 (diff)
downloadsandcrawler-a7156b06340460e0e70a19891e161b8b8f4f2078.tar.gz
sandcrawler-a7156b06340460e0e70a19891e161b8b8f4f2078.zip
Merge branch 'bnewbold-ungrobided'
-rw-r--r--hbase/howto.md14
-rwxr-xr-xplease60
-rw-r--r--python/common.py26
-rwxr-xr-xpython/extraction_ungrobided.py268
-rw-r--r--python/tests/files/example_ungrobided.tsv20
-rw-r--r--python/tests/test_extraction_ungrobided.py176
-rw-r--r--scalding/src/main/scala/sandcrawler/DumpUnGrobidedJob.scala67
-rw-r--r--scalding/src/test/scala/sandcrawler/DumpUnGrobidedJobTest.scala72
8 files changed, 703 insertions, 0 deletions
diff --git a/hbase/howto.md b/hbase/howto.md
index fcf561f..26d33f4 100644
--- a/hbase/howto.md
+++ b/hbase/howto.md
@@ -26,3 +26,17 @@ To interact with this config, use happybase (python) config:
# Test connection
conn.tables()
+## Queries From Shell
+
+Fetch all columns for a single row:
+
+ hbase> get 'wbgrp-journal-extract-0-qa', 'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ'
+
+Fetch multiple columns for a single row, using column families:
+
+ hbase> get 'wbgrp-journal-extract-0-qa', 'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ', 'f', 'file'
+
+Scan a fixed number of rows (here 5) starting at a specific key prefix, all
+columns:
+
+ hbase> scan 'wbgrp-journal-extract-0-qa',{LIMIT=>5,STARTROW=>'sha1:A'}
diff --git a/please b/please
index c888bbc..a2658ab 100755
--- a/please
+++ b/please
@@ -64,16 +64,47 @@ def run_extract(args):
--grobid-uri {grobid_uri} \
-r hadoop \
-c mrjob.conf \
+ --output-dir {output} \
+ --no-output \
--archive venv-current.tar.gz#venv \
--jobconf mapred.line.input.format.linespermap=8000 \
--jobconf mapreduce.job.queuename=extraction \
--jobconf mapred.task.timeout=3600000 \
{input_cdx}
""".format(hbase_host=HBASE_HOST, env=args.env,
+ output=output,
input_cdx=args.input_cdx,
grobid_uri=GROBID_URI)
subprocess.call(cmd, shell=True)
+def run_extract_ungrobided(args):
+ if args.rebuild:
+ rebuild_python()
+ print("Starting extractungrobided job...")
+ output = "{}/output-{}/{}-extract-ungrobided".format(
+ HDFS_DIR,
+ args.env,
+ datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
+ cmd = """cd python;
+ pipenv run ./extraction_ungrobided.py \
+ --hbase-host {hbase_host} \
+ --hbase-table wbgrp-journal-extract-0-{env} \
+ --grobid-uri {grobid_uri} \
+ -r hadoop \
+ -c mrjob.conf \
+ --output-dir {output} \
+ --no-output \
+ --archive venv-current.tar.gz#venv \
+ --jobconf mapred.line.input.format.linespermap=8000 \
+ --jobconf mapreduce.job.queuename=extraction \
+ --jobconf mapred.task.timeout=3600000 \
+ {input_ungrobided}
+ """.format(hbase_host=HBASE_HOST, env=args.env,
+ input_ungrobided=args.input_ungrobided,
+ output=output,
+ grobid_uri=GROBID_URI)
+ subprocess.call(cmd, shell=True)
+
def run_rowcount(args):
if args.rebuild:
rebuild_scalding()
@@ -257,6 +288,27 @@ def run_keysmissingcol(args):
env=args.env)
subprocess.call(cmd, shell=True)
+def run_dumpungrobided(args):
+ if args.rebuild:
+ rebuild_scalding()
+ print("Starting dumpungrobided job...")
+ output = "{}/output-{}/{}-dumpungrobided".format(
+ HDFS_DIR,
+ args.env,
+ datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
+ cmd = """hadoop jar \
+ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
+ com.twitter.scalding.Tool sandcrawler.DumpUnGrobidedJob \
+ --hdfs \
+ --app.conf.path scalding/ia_cluster.conf \
+ --hbase-table wbgrp-journal-extract-0-{env} \
+ --zookeeper-hosts {zookeeper_hosts} \
+ --output {output}""".format(
+ output=output,
+ zookeeper_hosts=ZOOKEEPER_HOSTS,
+ env=args.env)
+ subprocess.call(cmd, shell=True)
+
def main():
parser = argparse.ArgumentParser()
@@ -281,6 +333,11 @@ def main():
sub_extract.add_argument('input_cdx',
help="full HDFS path of CDX file to extract")
+ sub_extractungrobided = subparsers.add_parser('extract-ungrobided')
+ sub_extractungrobided.set_defaults(func=run_extract_ungrobided)
+ sub_extractungrobided.add_argument('input_ungrobided',
+ help="full HDFS path of 'ungrobided' file to extract")
+
sub_rowcount = subparsers.add_parser('row-count')
sub_rowcount.set_defaults(func=run_rowcount)
@@ -320,6 +377,9 @@ def main():
sub_keysmissingcol.add_argument('column',
help="column to SCAN for missing keys")
+ sub_dumpungrobided = subparsers.add_parser('dump-ungrobided')
+ sub_dumpungrobided.set_defaults(func=run_dumpungrobided)
+
args = parser.parse_args()
if not args.__dict__.get("func"):
print("tell me what to do! (try --help)")
diff --git a/python/common.py b/python/common.py
index 6710044..e596b35 100644
--- a/python/common.py
+++ b/python/common.py
@@ -1,4 +1,5 @@
+import json
from datetime import datetime
NORMAL_MIME = (
@@ -71,3 +72,28 @@ def parse_cdx_line(raw_cdx):
# 'i' intentionally not set
heritrix = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1)
return {'key': key, 'file:mime': mime, 'file:cdx': info, 'f:c': heritrix}
+
+def parse_ungrobided_line(raw_line):
+
+ line = raw_line.strip().split("\t")
+ if len(line) != 4:
+ return None
+
+ key = line[0]
+ mime = normalize_mime(line[2])
+ try:
+ f_c = json.loads(line[1])
+ cdx = json.loads(line[3])
+ except json.JSONDecodeError:
+ return None
+
+ if not (key[5:].isalnum() and len(key) == 37 and mime != None):
+ print(mime)
+ print(key)
+ print("FAIL")
+ return None
+
+ if '-' in (key, mime, f_c, cdx):
+ return None
+
+ return {'key': key, 'file:mime': mime, 'file:cdx': cdx, 'f:c': f_c}
diff --git a/python/extraction_ungrobided.py b/python/extraction_ungrobided.py
new file mode 100755
index 0000000..4074112
--- /dev/null
+++ b/python/extraction_ungrobided.py
@@ -0,0 +1,268 @@
+#!/usr/bin/env python3
+"""
+Variant of extraction_cdx_grobid which takes a partial metadata list as input
+instead of CDX.
+
+This task list is dumped by another Hadoop job which scans over the HBase table
+quickly, which allows this job to skip a (relatively) expensive HBase read
+per-row.
+
+Requires:
+- happybase
+- mrjob
+- wayback/GWB libraries
+"""
+
+# XXX: some broken MRO thing going on in here due to python3 object wrangling
+# in `wayback` library. Means we can't run pylint.
+# pylint: skip-file
+
+import xml
+import json
+import raven
+import struct
+import requests
+import happybase
+import mrjob
+from mrjob.job import MRJob
+import wayback.exception
+from wayback.resource import Resource
+from wayback.resource import ArcResource
+from wayback.resourcestore import ResourceStore
+from gwb.loader import CDXLoaderFactory
+from common import parse_ungrobided_line
+from grobid2json import teixml2json
+
+# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable
+sentry_client = raven.Client()
+
+# Specific poison-pill rows we should skip
+KEY_BLACKLIST = (
+ 'sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT', # "failed to guess ARC header format"
+)
+
+class MRExtractUnGrobided(MRJob):
+
+ # "ungrobided" TSV lines in; JSON status out
+ #HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.NLineInputFormat'
+ #INPUT_PROTOCOL = mrjob.protocol.RawProtocol
+ INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
+ OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol
+
+ def configure_args(self):
+ super(MRExtractUnGrobided, self).configure_args()
+
+ self.add_passthru_arg('--hbase-table',
+ type=str,
+ default='wbgrp-journal-extract-0-qa',
+ help='HBase table to backfill into (must exist)')
+ self.add_passthru_arg('--hbase-host',
+ type=str,
+ default='localhost',
+ help='HBase thrift API host to connect to')
+ self.add_passthru_arg('--grobid-uri',
+ type=str,
+ default='http://localhost:8070',
+ help='URI of GROBID API Server')
+ self.add_passthru_arg('--warc-uri-prefix',
+ type=str,
+ default='https://archive.org/serve/',
+ help='URI where WARCs can be found')
+
+ def __init__(self, *args, **kwargs):
+ super(MRExtractUnGrobided, self).__init__(*args, **kwargs)
+ self.mime_filter = ['application/pdf']
+ self.hb_table = None
+
+ def grobid_process_fulltext(self, content):
+ r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument",
+ files={'input': content})
+ return r
+
+ def mapper_init(self):
+
+ if self.hb_table:
+ return
+
+ sentry_client.tags_context(dict(hbase_table=self.options.hbase_table))
+ try:
+ host = self.options.hbase_host
+ # TODO: make these configs accessible from... mrconf.cfg?
+ hb_conn = happybase.Connection(host=host, transport="framed",
+ protocol="compact")
+ except Exception:
+ raise Exception("Couldn't connect to HBase using host: {}".format(host))
+ self.hb_table = hb_conn.table(self.options.hbase_table)
+
+ def parse_ungrobided_line(self, raw_line):
+ """Line should be TSV and have non-null fields:
+
+ - key (string)
+ - f:c (string, json)
+ - file:mime (string)
+ - file:cdx (string, json)
+ """
+
+ if (raw_line.startswith(' ') or raw_line.startswith('#')):
+ return None, dict(status="invalid", reason="line prefix", input=raw_line)
+
+ info = parse_ungrobided_line(raw_line)
+ if info is None:
+ return None, dict(status="invalid", reason="ungrobided parse")
+
+ if info['file:mime'] not in self.mime_filter:
+ return None, dict(status="skip", reason="mimetype", mimetype=info['file:mime'])
+
+ # If warc is not item/file.(w)arc.gz form, skip it
+ if len(info['file:cdx']['warc'].split('/')) != 2:
+ return None, dict(status="skip", reason="WARC path not petabox item/file", path=info['file:cdx']['warc'])
+
+ return info, None
+
+ def fetch_warc_content(self, warc_path, offset, c_size):
+ warc_uri = self.options.warc_uri_prefix + warc_path
+ try:
+ rstore = ResourceStore(loaderfactory=CDXLoaderFactory())
+ gwb_record = rstore.load_resource(warc_uri, offset, c_size)
+ except wayback.exception.ResourceUnavailable:
+ return None, dict(status="error",
+ reason="failed to load file contents from wayback/petabox")
+
+ if gwb_record.get_status()[0] != 200:
+ return None, dict(status="error",
+ reason="archived HTTP response (WARC) was not 200",
+ warc_status=gwb_record.get_status()[0])
+ return gwb_record.open_raw_content().read(), None
+
+ def extract(self, info):
+
+ # Fetch data from WARCs in petabox
+ original_content, status = self.fetch_warc_content(
+ info['file:cdx']['warc'],
+ info['file:cdx']['offset'],
+ info['file:cdx']['c_size'])
+ if status:
+ return None, status
+
+ info['file:size'] = len(original_content)
+
+ # Submit to GROBID
+ try:
+ grobid_response = self.grobid_process_fulltext(original_content)
+ except requests.exceptions.ConnectionError:
+ return None, dict(status="error", reason="connection to GROBID worker")
+
+ info['grobid0:status_code'] = grobid_response.status_code
+
+ # 4 MByte XML size limit; don't record GROBID status on this path
+ if len(grobid_response.content) > 4000000:
+ info['grobid0:status'] = {'status': 'oversize'}
+ return info, dict(status="oversize", reason="TEI response was too large")
+
+ if grobid_response.status_code != 200:
+ # response.text is .content decoded as utf-8
+ info['grobid0:status'] = dict(status='error', description=grobid_response.text)
+ return info, dict(status="error", reason="non-200 GROBID HTTP status",
+ extra=grobid_response.text)
+
+ info['grobid0:status'] = {'status': 'partial'}
+ info['grobid0:tei_xml'] = grobid_response.content
+
+ # Convert TEI XML to JSON
+ try:
+ info['grobid0:tei_json'] = teixml2json(info['grobid0:tei_xml'], encumbered=True)
+ except xml.etree.ElementTree.ParseError:
+ info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML parse error")
+ return info, info['grobid0:status']
+ except ValueError:
+ info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML non-TEI content")
+ return info, info['grobid0:status']
+
+ tei_metadata = info['grobid0:tei_json'].copy()
+ for k in ('body', 'annex'):
+ # Remove fulltext (copywritted) content
+ tei_metadata.pop(k, None)
+ info['grobid0:metadata'] = tei_metadata
+
+ # Determine extraction "quality"
+ # TODO:
+
+ info['grobid0:quality'] = None
+ info['grobid0:status'] = {'status': 'success'}
+
+ return info, None
+
+ @sentry_client.capture_exceptions
+ def mapper(self, _, raw_line):
+ """
+ 1. parse filtered line
+ 2. fetch data from wayback
+ 3. submit to GROBID
+ 4. convert GROBID response to JSON (and metadata)
+ 6. determine "quality"
+ 6. push results to hbase
+ """
+
+ self.increment_counter('lines', 'total')
+
+ # Parse line and filter down
+ info, status = self.parse_ungrobided_line(raw_line)
+ if info is None:
+ self.increment_counter('lines', status['status'])
+ yield _, status
+ return
+ key = info['key']
+ if key in KEY_BLACKLIST:
+ self.increment_counter('lines', 'blacklist')
+ yield _, dict(status='blacklist', key=key)
+ return
+
+ # Note: this may not get "cleared" correctly
+ sentry_client.extra_context(dict(row_key=key))
+
+ # Do the extraction
+ info, status = self.extract(info)
+ if info is None:
+ self.increment_counter('lines', status['status'])
+ status['key'] = key
+ yield _, status
+ return
+ extraction_status = status
+
+ # Decide what to bother inserting back into HBase
+ # Basically, don't overwrite backfill fields.
+ grobid_status_code = info.get('grobid0:status_code', None)
+ for k in list(info.keys()):
+ if k.encode('utf-8') in ('f:c', 'file:mime', 'file:cdx'):
+ info.pop(k)
+
+ # Convert fields to binary
+ for k in list(info.keys()):
+ if info[k] is None:
+ info.pop(k)
+ # NOTE: we're not actually sending these f:*, file:* keys...
+ elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',
+ 'grobid0:metadata'):
+ assert type(info[k]) == dict
+ info[k] = json.dumps(info[k], sort_keys=True, indent=None)
+ elif k in ('file:size', 'grobid0:status_code'):
+ # encode as int64 in network byte order
+ if info[k] != {} and info[k] != None:
+ info[k] = struct.pack('!q', info[k])
+
+ key = info.pop('key')
+ self.hb_table.put(key, info)
+ self.increment_counter('lines', 'success')
+
+ if extraction_status is not None:
+ yield _, dict(status="partial", key=key,
+ grobid_status_code=grobid_status_code,
+ reason=extraction_status['reason'])
+ else:
+ yield _, dict(status="success",
+ grobid_status_code=grobid_status_code, key=key,
+ extra=extraction_status)
+
+
+if __name__ == '__main__': # pragma: no cover
+ MRExtractUnGrobided.run()
diff --git a/python/tests/files/example_ungrobided.tsv b/python/tests/files/example_ungrobided.tsv
new file mode 100644
index 0000000..9263b6f
--- /dev/null
+++ b/python/tests/files/example_ungrobided.tsv
@@ -0,0 +1,20 @@
+sha1:23LOSW2QVMKUYXPFZBXQHBBNQR45WTMU {"c": 1, "d": "2017-10-27T22:21:13", "f": "PDFS-20171027214658-00155.warc.gz", "o": 984263791, "u": "http://circ.ahajournals.org/content/circulationaha/53/6/965.full.pdf"} application/pdf {"c_size": 1050532, "dt": "20171027222113", "offset": 984263791, "surt": "org,ahajournals,circ)/content/circulationaha/53/6/965.full.pdf", "url": "http://circ.ahajournals.org/content/circulationaha/53/6/965.full.pdf", "warc": "PDFS-20171027125450-crawl815/PDFS-20171027214658-00155.warc.gz"}
+sha1:23M2N262M5TWB7F3BVB6ESD3Q26SMPFA {"c": 1, "d": "2012-09-29T07:05:16", "f": "ARCHIVEIT-219-QUARTERLY-FWGZDI-20120929065657-00119-crawling203.us.archive.org-6680.warc.gz", "o": 83570746, "u": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=37&id=225&artlang=en"} application/pdf {"c_size": 3590, "dt": "20120929070516", "offset": 83570746, "surt": "edu,indiana)/~orafaq/faq/pdf.php?artlang=en&cat=37&id=225", "url": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=37&id=225&artlang=en", "warc": "ARCHIVEIT-219-QUARTERLY-FWGZDI-00001/ARCHIVEIT-219-QUARTERLY-FWGZDI-20120929065657-00119-crawling203.us.archive.org-6680.warc.gz"}
+sha1:23MFQLDGP4WJD67BS7ERMYQUF7TGCG5X {"c": 1, "d": "2017-08-25T15:19:28", "f": "MSAG-PDF-CRAWL-2017-08-04-20170825143335512-08107-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", "o": 573475485, "u": "http://www.bloodjournal.org/content/bloodjournal/77/7/1484.full.pdf?sso-checked=true"} application/pdf {"c_size": 3411470, "dt": "20170825151928", "offset": 573475485, "surt": "org,bloodjournal)/content/bloodjournal/77/7/1484.full.pdf?sso-checked=true", "url": "http://www.bloodjournal.org/content/bloodjournal/77/7/1484.full.pdf?sso-checked=true", "warc": "MSAG-PDF-CRAWL-2017-08-04-20170825114428485-08102-08111-wbgrp-svc284/MSAG-PDF-CRAWL-2017-08-04-20170825143335512-08107-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"}
+sha1:23MG6K5Z3JENYCZ2OGTNOJ7QPYANJRCZ {"c": 1, "d": "2017-10-10T11:29:50", "f": "PDFS-20171010110639-00278.warc.gz", "o": 665222706, "u": "http://circ.ahajournals.org/content/circulationaha/53/5/797.full.pdf?download=true"} application/pdf {"c_size": 1107121, "dt": "20171010112950", "offset": 665222706, "surt": "org,ahajournals,circ)/content/circulationaha/53/5/797.full.pdf?download=true", "url": "http://circ.ahajournals.org/content/circulationaha/53/5/797.full.pdf?download=true", "warc": "PDFS-20171010110639-crawl815/PDFS-20171010110639-00278.warc.gz"}
+sha1:23MN67JQKDWRUXJMXXJ2GX6O43SQIV76 {"c": 1, "d": "2015-06-06T18:17:08", "f": "ARCHIVEIT-219-QUARTERLY-9582-20150606125000169-00071-wbgrp-crawl067.us.archive.org-6440.warc.gz", "o": 603211220, "u": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=95&artlang=en"} application/pdf {"c_size": 3450, "dt": "20150606181708", "offset": 603211220, "surt": "edu,indiana)/~orafaq/faq/pdf.php?artlang=en&cat=36&id=95", "url": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=95&artlang=en", "warc": "ARCHIVEIT-219-QUARTERLY-9582-00007/ARCHIVEIT-219-QUARTERLY-9582-20150606125000169-00071-wbgrp-crawl067.us.archive.org-6440.warc.gz"}
+sha1:23MQCEOMQS5SMZCXIPNQ4E3ZKOCF6DIM {"c": 1, "d": "2004-01-02T19:45:22", "f": "DU_crawl10.20040102194453.arc.gz", "o": 38062592, "u": "http://www.csupomona.edu:80/~darsadmin/ACTIONITEMS.pdf"} application/pdf {"c_size": 15406, "dt": "20040102194522", "offset": 38062592, "surt": "edu,csupomona)/~darsadmin/actionitems.pdf", "url": "http://www.csupomona.edu:80/~darsadmin/ACTIONITEMS.pdf", "warc": "DU_crawl10.20040102181929-c/DU_crawl10.20040102194453.arc.gz"}
+sha1:23NKO4TW6XCESXMSUOOICI3AXVK6Z5BL {"c": 1, "d": "2015-04-27T11:16:33", "f": "eric.ed.gov-inf-20150409-030712-1648j-00064.warc.gz", "o": 3872820483, "u": "http://files.eric.ed.gov/fulltext/ED088632.pdf"} application/pdf {"c_size": 2223528, "dt": "20150427111633", "offset": 3872820483, "surt": "gov,ed,eric,files)/fulltext/ed088632.pdf", "url": "http://files.eric.ed.gov/fulltext/ED088632.pdf", "warc": "archiveteam_archivebot_go_20150427150006/eric.ed.gov-inf-20150409-030712-1648j-00064.warc.gz"}
+sha1:23NW2EPLXDA6UBIJLQMM2DJ2K3GL3WTB {"c": 1, "d": "2014-08-13T09:04:30", "f": "WIDE-20140813084304-09684.warc.gz", "o": 726289594, "u": "http://research.sdccd.edu/docs/Accreditation/2012%20Surveys/Employee%20-%20Briefing/Mesa%20College%202012%20Employee%20Feedback%20Survey%20Briefing.pdf"} application/pdf {"c_size": 3472527, "dt": "20140813090430", "offset": 726289594, "surt": "edu,sdccd,research)/docs/accreditation/2012%20surveys/employee%20-%20briefing/mesa%20college%202012%20employee%20feedback%20survey%20briefing.pdf", "url": "http://research.sdccd.edu/docs/Accreditation/2012%20Surveys/Employee%20-%20Briefing/Mesa%20College%202012%20Employee%20Feedback%20Survey%20Briefing.pdf", "warc": "WIDE-20140813074743-crawl424/WIDE-20140813084304-09684.warc.gz"}
+sha1:23OQICQ4IBVNHJBWJX5ON3QR26KNMQNT {"c": 1, "d": "2010-06-29T00:35:17", "f": "EDG-20100628234135-01241-ia360918.us.archive.org.warc.gz", "o": 572430160, "u": "http://journalism.arizona.edu/news/rockypt6.pdf"} application/pdf {"c_size": 194706, "dt": "20100629003517", "offset": 572430160, "surt": "edu,arizona,journalism)/news/rockypt6.pdf", "url": "http://journalism.arizona.edu/news/rockypt6.pdf", "warc": "EDG-20100628214935-01235-01243-ia360918-20100629023741-00000/EDG-20100628234135-01241-ia360918.us.archive.org.warc.gz"}
+sha1:23OT2AAYPJ3Z5ZOQXVJJTVTKY6QUPICI {"c": 1, "d": "2007-02-25T17:49:01", "f": "38_0_20070225174831_crawl28.arc.gz", "o": 93868066, "u": "http://www.ece.tufts.edu:80/~hopwood/tampa-proceedings.pdf"} application/pdf {"c_size": 162157, "dt": "20070225174901", "offset": 93868066, "surt": "edu,tufts,ece)/~hopwood/tampa-proceedings.pdf", "url": "http://www.ece.tufts.edu:80/~hopwood/tampa-proceedings.pdf", "warc": "38_0_20070225173722_crawl28-c/38_0_20070225174831_crawl28.arc.gz"}
+sha1:23OUFX3ZYMF53HY4RUONR5PKN4HXN4O3 {"c": 1, "d": "2004-05-26T06:45:34", "f": "DW_crawl10.20040526064432.arc.gz", "o": 67593910, "u": "http://207.36.165.114:80/NewOrleans/Papers/1301466.pdf"} application/pdf {"c_size": 306879, "dt": "20040526064534", "offset": 67593910, "surt": "114,165,36,207)/neworleans/papers/1301466.pdf", "url": "http://207.36.165.114:80/NewOrleans/Papers/1301466.pdf", "warc": "DW_crawl10.20040525230808-c/DW_crawl10.20040526064432.arc.gz"}
+sha1:23PA23UIWCBA3CSTDK2JYX7ZIVOHULFG {"c": 1, "d": "2016-02-05T21:48:33", "f": "NLNZ-NZ-CRAWL-005-20160205211003375-02839-6291~wbgrp-crawl007.us.archive.org~8443.warc.gz", "o": 630386943, "u": "http://homepages.engineering.auckland.ac.nz/~smohan/Outreach/Docs/2013/TTU_REU2013.pdf"} application/pdf {"c_size": 2979614, "dt": "20160205214833", "offset": 630386943, "surt": "nz,ac,auckland,engineering,homepages)/~smohan/outreach/docs/2013/ttu_reu2013.pdf", "url": "http://homepages.engineering.auckland.ac.nz/~smohan/Outreach/Docs/2013/TTU_REU2013.pdf", "warc": "NLNZ-NZ-CRAWL-005-20160205211003375-02839-02848-wbgrp-crawl007/NLNZ-NZ-CRAWL-005-20160205211003375-02839-6291~wbgrp-crawl007.us.archive.org~8443.warc.gz"}
+sha1:23PGC74CTD7P6PCF3MZZZJMPYFXRK3OB {"c": 1, "d": "2005-03-17T15:05:51", "f": "EC_binary1_crawl30.20050317150502.arc.gz", "o": 75675778, "u": "http://www.csupomona.edu:80/%7Eengineering/programs/courses/aro/course_outlines/aro_407.pdf"} application/pdf {"c_size": 4842, "dt": "20050317150551", "offset": 75675778, "surt": "edu,csupomona)/~engineering/programs/courses/aro/course_outlines/aro_407.pdf", "url": "http://www.csupomona.edu:80/%7Eengineering/programs/courses/aro/course_outlines/aro_407.pdf", "warc": "EC_binary1_crawl30.20050317135651-c/EC_binary1_crawl30.20050317150502.arc.gz"}
+sha1:23PKJEQWUJAIQQSLP3GCCC5VDXN4RFCX {"c": 1, "d": "2017-10-10T23:50:37", "f": "WIDE-20171010214240-16560.warc.gz", "o": 962106404, "u": "http://www.nbrb.by/bv/articles/8997.pdf"} application/pdf {"c_size": 273375, "dt": "20171010235037", "offset": 962106404, "surt": "by,nbrb)/bv/articles/8997.pdf", "url": "http://www.nbrb.by/bv/articles/8997.pdf", "warc": "WIDE-20171010202419-crawl424/WIDE-20171010214240-16560.warc.gz"}
+sha1:23PRILJUIQUKHRYQIUYAKSBFPH53FOGT {"c": 1, "d": "2017-07-14T18:51:38", "f": "WIDE-20170714181144-06521.warc.gz", "o": 820382225, "u": "http://carsandracingstuff.com/library/articles/32538.pdf"} application/pdf {"c_size": 125426, "dt": "20170714185138", "offset": 820382225, "surt": "com,carsandracingstuff)/library/articles/32538.pdf", "url": "http://carsandracingstuff.com/library/articles/32538.pdf", "warc": "WIDE-20170714174218-crawl426/WIDE-20170714181144-06521.warc.gz"}
+sha1:23PTUXWSNSVE4HS5J7ELDUUG63J2FPCI {"c": 1, "d": "2016-06-09T00:27:36", "f": "WIDE-20160609001810-06993.warc.gz", "o": 287880616, "u": "http://www.case-research.eu/sites/default/files/publications/18092393_E-brief_Dabrowski_Monetary_Policy_final_0.pdf"} application/pdf {"c_size": 68262, "dt": "20160609002736", "offset": 287880616, "surt": "eu,case-research)/sites/default/files/publications/18092393_e-brief_dabrowski_monetary_policy_final_0.pdf", "url": "http://www.case-research.eu/sites/default/files/publications/18092393_E-brief_Dabrowski_Monetary_Policy_final_0.pdf", "warc": "WIDE-20160609000312-crawl427/WIDE-20160609001810-06993.warc.gz"}
+sha1:23PW2APYHNBPIBRIVNQ6TMKUNY53UL3D {"c": 1, "d": "2016-01-07T03:29:03", "f": "MUSEUM-20160107025230-02354.warc.gz", "o": 413484441, "u": "http://www.portlandoregon.gov/fire/article/363695"} application/pdf {"c_size": 44600, "dt": "20160107032903", "offset": 413484441, "surt": "gov,portlandoregon)/fire/article/363695", "url": "http://www.portlandoregon.gov/fire/article/363695", "warc": "MUSEUM-20160107004301-crawl891/MUSEUM-20160107025230-02354.warc.gz"}
+sha1:23RJIHUIOYY5747CR6YYCTMACXDCFYTT {"c": 1, "d": "2014-06-07T18:00:56", "f": "ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz", "o": 720590380, "u": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=264&artlang=en"} application/pdf {"c_size": 3727, "dt": "20140607180056", "offset": 720590380, "surt": "edu,indiana)/~orafaq/faq/pdf.php?artlang=en&cat=36&id=264", "url": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=264&artlang=en", "warc": "ARCHIVEIT-219-QUARTERLY-20047-00001/ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz"}
+sha1:23SMLYPFEGIRV6M37FJ5D364TXQXCSMR {"c": 1, "d": "2011-07-12T22:20:32", "f": "WIDE-20110712221302-03146.warc.gz", "o": 222089710, "u": "http://media.dailyuw.com/papers/_091030_7-14_color_web.pdf"} application/pdf {"c_size": 4654708, "dt": "20110712222032", "offset": 222089710, "surt": "com,dailyuw,media)/papers/_091030_7-14_color_web.pdf", "url": "http://media.dailyuw.com/papers/_091030_7-14_color_web.pdf", "warc": "WIDE-20110712221302-crawl413/WIDE-20110712221302-03146.warc.gz"}
+sha1:23SN4XBPSCRPRIHH5UAV45LFCP3VDV3V {"c": 1, "d": "2010-10-28T09:03:57", "f": "WIDE-20101028084449158-00409-23450~ia360921.us.archive.org~9443.warc.gz", "o": 756726028, "u": "http://cdacnoida.in/ASCNT-2010/Language%20Technology/Paper/Reducing%20Errors%20in%20Translation%20using%20Pre-editor%20for%20Indian%20English%20Sentences.pdf"} application/pdf {"c_size": 98408, "dt": "20101028090357", "offset": 756726028, "surt": "in,cdacnoida)/ascnt-2010/language%20technology/paper/reducing%20errors%20in%20translation%20using%20pre-editor%20for%20indian%20english%20sentences.pdf", "url": "http://cdacnoida.in/ASCNT-2010/Language%20Technology/Paper/Reducing%20Errors%20in%20Translation%20using%20Pre-editor%20for%20Indian%20English%20Sentences.pdf", "warc": "WIDE-20101028063239344-00397-00415-ia360921/WIDE-20101028084449158-00409-23450~ia360921.us.archive.org~9443.warc.gz"}
diff --git a/python/tests/test_extraction_ungrobided.py b/python/tests/test_extraction_ungrobided.py
new file mode 100644
index 0000000..366d392
--- /dev/null
+++ b/python/tests/test_extraction_ungrobided.py
@@ -0,0 +1,176 @@
+
+import io
+import json
+import mrjob
+import pytest
+import struct
+import responses
+import happybase_mock
+import wayback.exception
+from unittest import mock
+from common import parse_ungrobided_line
+from extraction_ungrobided import MRExtractUnGrobided
+
+
+FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843)
+OK_UNGROBIDED_LINE = b"\t".join((
+ b"sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ",
+ b"""{"c": 1, "d": "2017-07-06T07:54:11", "f": "CITESEERX-CRAWL-2017-06-20-20170706075012840-00388-3671~wbgrp-svc285.us.archive.org~8443.warc.gz", "o": 914718776, "u": "http://www.ibc7.org/article/file_down.php?mode%3Darticle_print%26pid%3D250"}""",
+ b"application/pdf",
+ b"""{"c_size": 501, "dt": "20170706075411", "offset": 914718776, "surt": "org,ibc7)/article/file_down.php?mode=article_print&pid=250", "url": "http://www.ibc7.org/article/file_down.php?mode%3Darticle_print%26pid%3D250", "warc": "CITESEERX-CRAWL-2017-06-20-20170706074206206-00379-00388-wbgrp-svc285/CITESEERX-CRAWL-2017-06-20-20170706075012840-00388-3671~wbgrp-svc285.us.archive.org~8443.warc.gz"}""",
+))
+
+with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f:
+ REAL_TEI_XML = f.read()
+
+@pytest.fixture
+def job():
+ """
+ Note: this mock only seems to work with job.run_mapper(), not job.run();
+ the later results in a separate instantiation without the mock?
+ """
+ job = MRExtractUnGrobided(['--no-conf', '-'])
+
+ conn = happybase_mock.Connection()
+ conn.create_table('wbgrp-journal-extract-test',
+ {'file': {}, 'grobid0': {}, 'f': {}})
+ job.hb_table = conn.table('wbgrp-journal-extract-test')
+
+ return job
+
+
+@mock.patch('extraction_ungrobided.MRExtractUnGrobided.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
+@responses.activate
+def test_mapper_single_line(mock_fetch, job):
+
+ responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,
+ body=REAL_TEI_XML, content_type='text/xml')
+
+ raw = io.BytesIO(OK_UNGROBIDED_LINE)
+
+ output = io.BytesIO()
+ job.sandbox(stdin=raw, stdout=output)
+
+ job.run_mapper()
+
+ # for debugging tests
+ #print(output.getvalue().decode('utf-8'))
+ #print(list(job.hb_table.scan()))
+
+ # wayback gets FETCH 1x times
+ mock_fetch.assert_called_once_with(
+ "CITESEERX-CRAWL-2017-06-20-20170706074206206-00379-00388-wbgrp-svc285/CITESEERX-CRAWL-2017-06-20-20170706075012840-00388-3671~wbgrp-svc285.us.archive.org~8443.warc.gz",
+ 914718776,
+ 501)
+
+ # grobid gets POST 1x times
+ assert len(responses.calls) == 1
+
+ # HBase
+ assert job.hb_table.row(b'1') == {}
+
+ # Saved extraction info
+ row = job.hb_table.row(b'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ')
+
+ assert struct.unpack("!q", row[b'file:size'])[0] == len(FAKE_PDF_BYTES)
+ assert row[b'file:mime'] == b"application/pdf"
+ assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200
+ # TODO: assert row[b'grobid0:quality'] == None
+ status = json.loads(row[b'grobid0:status'].decode('utf-8'))
+ assert type(status) == type(dict())
+ assert row[b'grobid0:tei_xml'].decode('utf-8') == REAL_TEI_XML
+ tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8'))
+ metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8'))
+ assert tei_json['title'] == metadata['title']
+ assert 'body' in tei_json
+ assert 'body' not in metadata
+
+@mock.patch('extraction_ungrobided.MRExtractUnGrobided.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
+@responses.activate
+def test_mapper_lines(mock_fetch, job):
+
+ responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,
+ body=REAL_TEI_XML, content_type='text/xml')
+
+ raw = io.BytesIO(b"""sha1:23PTUXWSNSVE4HS5J7ELDUUG63J2FPCI\t{"c": 1, "d": "2016-06-09T00:27:36", "f": "WIDE-20160609001810-06993.warc.gz", "o": 287880616, "u": "http://www.case-research.eu/sites/default/files/publications/18092393_E-brief_Dabrowski_Monetary_Policy_final_0.pdf"}\tapplication/pdf\t{"c_size": 68262, "dt": "20160609002736", "offset": 287880616, "surt": "eu,case-research)/sites/default/files/publications/18092393_e-brief_dabrowski_monetary_policy_final_0.pdf", "url": "http://www.case-research.eu/sites/default/files/publications/18092393_E-brief_Dabrowski_Monetary_Policy_final_0.pdf", "warc": "WIDE-20160609000312-crawl427/WIDE-20160609001810-06993.warc.gz"}
+sha1:23PW2APYHNBPIBRIVNQ6TMKUNY53UL3D\t{"c": 1, "d": "2016-01-07T03:29:03", "f": "MUSEUM-20160107025230-02354.warc.gz", "o": 413484441, "u": "http://www.portlandoregon.gov/fire/article/363695"}\tapplication/pdf\t{"c_size": 44600, "dt": "20160107032903", "offset": 413484441, "surt": "gov,portlandoregon)/fire/article/363695", "url": "http://www.portlandoregon.gov/fire/article/363695", "warc": "MUSEUM-20160107004301-crawl891/MUSEUM-20160107025230-02354.warc.gz"}
+sha1:23RJIHUIOYY5747CR6YYCTMACXDCFYTT\t{"c": 1, "d": "2014-06-07T18:00:56", "f": "ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz", "o": 720590380, "u": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=264&artlang=en"}\tapplication/pdf\t{"c_size": 3727, "dt": "20140607180056", "offset": 720590380, "surt": "edu,indiana)/~orafaq/faq/pdf.php?artlang=en&cat=36&id=264", "url": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=264&artlang=en", "warc": "ARCHIVEIT-219-QUARTERLY-20047-00001/ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz"}""")
+
+
+ output = io.BytesIO()
+ job.sandbox(stdin=raw, stdout=output)
+
+ job.run_mapper()
+
+ # for debugging tests
+ #print(output.getvalue().decode('utf-8'))
+ #print(list(job.hb_table.scan()))
+
+ # grobid gets POST 3x times
+ assert len(responses.calls) == 3
+
+ # wayback gets FETCH 3x times
+ mock_fetch.assert_has_calls((
+ mock.call("WIDE-20160609000312-crawl427/WIDE-20160609001810-06993.warc.gz", 287880616, 68262),
+ mock.call("MUSEUM-20160107004301-crawl891/MUSEUM-20160107025230-02354.warc.gz", 413484441, 44600),
+ mock.call("ARCHIVEIT-219-QUARTERLY-20047-00001/ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz", 720590380, 3727),
+ ))
+
+ # Saved extraction info
+ assert job.hb_table.row(b'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ') == {}
+ assert job.hb_table.row(b'sha1:23PTUXWSNSVE4HS5J7ELDUUG63J2FPCI') != {}
+ assert job.hb_table.row(b'sha1:23PW2APYHNBPIBRIVNQ6TMKUNY53UL3D') != {}
+ assert job.hb_table.row(b'sha1:23RJIHUIOYY5747CR6YYCTMACXDCFYTT') != {}
+
+ row = job.hb_table.row(b'sha1:23RJIHUIOYY5747CR6YYCTMACXDCFYTT')
+ assert struct.unpack("!q", row[b'file:size'])[0] == len(FAKE_PDF_BYTES)
+ assert row[b'file:mime'] == b"application/pdf"
+ assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200
+ status = json.loads(row[b'grobid0:status'].decode('utf-8'))
+ assert type(status) == type(dict())
+ assert row[b'grobid0:tei_xml'].decode('utf-8') == REAL_TEI_XML
+ tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8'))
+ metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8'))
+ assert tei_json['title'] == metadata['title']
+ assert 'body' in tei_json
+ assert 'body' not in metadata
+
+def test_parse_ungrobided_invalid(job):
+
+ print("space-prefixed line")
+ raw = " com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
+ info, status = job.parse_ungrobided_line(raw)
+ assert info is None
+ assert status['status'] == "invalid"
+ assert 'prefix' in status['reason']
+
+ print("commented line")
+ raw = "#com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
+ info, status = job.parse_ungrobided_line(raw)
+ assert info is None
+ assert status['status'] == "invalid"
+ assert 'prefix' in status['reason']
+
+ print("wrong column count")
+ raw = "a b c d e"
+ info, status = job.parse_ungrobided_line(raw)
+ assert info is None
+ assert status['status'] == "invalid"
+ assert 'parse' in status['reason']
+
+ print("CDX line, somehow")
+ raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf - 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
+ info, status = job.parse_ungrobided_line(raw)
+ assert info is None
+ print(status)
+ assert status['status'] == "invalid"
+ assert 'parse' in status['reason']
+
+def test_parse_ungrobided_valid():
+
+ parsed = parse_ungrobided_line(OK_UNGROBIDED_LINE.decode('utf-8'))
+ assert parsed['key'] == "sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ"
+ assert parsed['f:c']['u'] == "http://www.ibc7.org/article/file_down.php?mode%3Darticle_print%26pid%3D250"
+ assert parsed['file:mime'] == "application/pdf"
+ assert parsed['file:cdx']['c_size'] == 501
+ assert parsed['file:cdx']['dt'] == "20170706075411"
diff --git a/scalding/src/main/scala/sandcrawler/DumpUnGrobidedJob.scala b/scalding/src/main/scala/sandcrawler/DumpUnGrobidedJob.scala
new file mode 100644
index 0000000..7fd3ce0
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/DumpUnGrobidedJob.scala
@@ -0,0 +1,67 @@
+package sandcrawler
+
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+// Filters for HBase rows which have not had GROBID run on them, but do have
+// full CDX metadata, and dumps to a TSV for later extraction by the
+// "extraction-ungrobided" job.
+//
+// Does the same horrible join thing that DumpUnGrobidedJob does.
+class DumpUnGrobidedJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val output = args("output")
+
+ val allKeys : TypedPipe[(String,String,String,String)] = DumpUnGrobidedJob.getHBaseKeySource(
+ args("hbase-table"),
+ args("zookeeper-hosts"))
+ .read
+ .fromBytesWritable('key, 'c, 'mime, 'cdx)
+ .toTypedPipe[(String,String,String,String)]('key, 'c, 'mime, 'cdx)
+
+ val existingKeys : TypedPipe[(String,Boolean)] = DumpUnGrobidedJob.getHBaseColSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"))
+ .read
+ .fromBytesWritable('key)
+ .toTypedPipe[String]('key)
+ .map{ key => (key, true) }
+
+ val missingKeys : TypedPipe[(String,String,String,String)] = allKeys
+ .groupBy(_._1)
+ .leftJoin(existingKeys.groupBy(_._1))
+ .toTypedPipe
+ .collect { case (key, ((_, c, mime, cdx), None)) => (key, c, mime, cdx) }
+
+ missingKeys
+ .write(TypedTsv[(String,String,String,String)](output))
+
+}
+
+object DumpUnGrobidedJob {
+
+ // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181"
+ def getHBaseColSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = {
+ HBaseBuilder.build(
+ hbaseTable,
+ zookeeperHosts,
+ List("grobid0:status_code"),
+ SourceMode.SCAN_ALL)
+ }
+
+ def getHBaseKeySource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = {
+ HBaseBuilder.build(
+ hbaseTable,
+ zookeeperHosts,
+ List("f:c", "file:mime", "file:cdx"),
+ SourceMode.SCAN_ALL)
+ }
+}
diff --git a/scalding/src/test/scala/sandcrawler/DumpUnGrobidedJobTest.scala b/scalding/src/test/scala/sandcrawler/DumpUnGrobidedJobTest.scala
new file mode 100644
index 0000000..8dda5c8
--- /dev/null
+++ b/scalding/src/test/scala/sandcrawler/DumpUnGrobidedJobTest.scala
@@ -0,0 +1,72 @@
+package sandcrawler
+
+import cascading.tuple.Fields
+import cascading.tuple.Tuple
+import com.twitter.scalding.JobTest
+import com.twitter.scalding.Tsv
+import com.twitter.scalding.TupleConversions
+import com.twitter.scalding.TypedTsv
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import org.junit.runner.RunWith
+import org.scalatest.FunSpec
+import org.scalatest.junit.JUnitRunner
+import org.slf4j.LoggerFactory
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBaseSource
+import scala._
+
+@RunWith(classOf[JUnitRunner])
+class DumpUnGrobidedJobTest extends FunSpec with TupleConversions {
+
+ val output = "/tmp/testOutput"
+ val (testTable, testHost) = ("test-table", "dummy-host:2181")
+
+ val log = LoggerFactory.getLogger(this.getClass.getName)
+
+ val statusCode: Long = 200
+ val statusBytes = Bytes.toBytes(statusCode)
+
+ val sampleDataGrobid : List[List[Array[Byte]]] = List(
+ ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", statusBytes),
+ ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", statusBytes),
+ ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", statusBytes),
+ ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", statusBytes),
+ ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", statusBytes),
+ ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", statusBytes))
+ .map(pair => List(Bytes.toBytes(pair._1), pair._2))
+
+ val sampleDataFile : List[List[Array[Byte]]] = List(
+ ("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
+ ("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
+ ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
+ ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
+ ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
+ ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
+ ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""),
+ ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""))
+ .map(pair => List(Bytes.toBytes(pair._1),
+ Bytes.toBytes(pair._2),
+ Bytes.toBytes(pair._3),
+ Bytes.toBytes(pair._4)))
+
+ JobTest("sandcrawler.DumpUnGrobidedJob")
+ .arg("test", "")
+ .arg("app.conf.path", "app.conf")
+ .arg("output", output)
+ .arg("hbase-table", testTable)
+ .arg("zookeeper-hosts", testHost)
+ .arg("debug", "true")
+ .source[Tuple](DumpUnGrobidedJob.getHBaseColSource(testTable, testHost),
+ sampleDataGrobid.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
+ .source[Tuple](DumpUnGrobidedJob.getHBaseKeySource(testTable, testHost),
+ sampleDataFile.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
+ .sink[Tuple](TypedTsv[(String,String,String,String)](output)) {
+ outputBuffer =>
+ it("should return correct-length list.") {
+ assert(outputBuffer.size === 2)
+ }
+ }
+ .run
+ .finish
+}