diff options
-rw-r--r-- | hbase/howto.md | 14 | ||||
-rwxr-xr-x | please | 60 | ||||
-rw-r--r-- | python/common.py | 26 | ||||
-rwxr-xr-x | python/extraction_ungrobided.py | 268 | ||||
-rw-r--r-- | python/tests/files/example_ungrobided.tsv | 20 | ||||
-rw-r--r-- | python/tests/test_extraction_ungrobided.py | 176 | ||||
-rw-r--r-- | scalding/src/main/scala/sandcrawler/DumpUnGrobidedJob.scala | 67 | ||||
-rw-r--r-- | scalding/src/test/scala/sandcrawler/DumpUnGrobidedJobTest.scala | 72 |
8 files changed, 703 insertions, 0 deletions
diff --git a/hbase/howto.md b/hbase/howto.md index fcf561f..26d33f4 100644 --- a/hbase/howto.md +++ b/hbase/howto.md @@ -26,3 +26,17 @@ To interact with this config, use happybase (python) config: # Test connection conn.tables() +## Queries From Shell + +Fetch all columns for a single row: + + hbase> get 'wbgrp-journal-extract-0-qa', 'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ' + +Fetch multiple columns for a single row, using column families: + + hbase> get 'wbgrp-journal-extract-0-qa', 'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ', 'f', 'file' + +Scan a fixed number of rows (here 5) starting at a specific key prefix, all +columns: + + hbase> scan 'wbgrp-journal-extract-0-qa',{LIMIT=>5,STARTROW=>'sha1:A'} @@ -64,16 +64,47 @@ def run_extract(args): --grobid-uri {grobid_uri} \ -r hadoop \ -c mrjob.conf \ + --output-dir {output} \ + --no-output \ --archive venv-current.tar.gz#venv \ --jobconf mapred.line.input.format.linespermap=8000 \ --jobconf mapreduce.job.queuename=extraction \ --jobconf mapred.task.timeout=3600000 \ {input_cdx} """.format(hbase_host=HBASE_HOST, env=args.env, + output=output, input_cdx=args.input_cdx, grobid_uri=GROBID_URI) subprocess.call(cmd, shell=True) +def run_extract_ungrobided(args): + if args.rebuild: + rebuild_python() + print("Starting extractungrobided job...") + output = "{}/output-{}/{}-extract-ungrobided".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """cd python; + pipenv run ./extraction_ungrobided.py \ + --hbase-host {hbase_host} \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --grobid-uri {grobid_uri} \ + -r hadoop \ + -c mrjob.conf \ + --output-dir {output} \ + --no-output \ + --archive venv-current.tar.gz#venv \ + --jobconf mapred.line.input.format.linespermap=8000 \ + --jobconf mapreduce.job.queuename=extraction \ + --jobconf mapred.task.timeout=3600000 \ + {input_ungrobided} + """.format(hbase_host=HBASE_HOST, env=args.env, + input_ungrobided=args.input_ungrobided, + output=output, + grobid_uri=GROBID_URI) + subprocess.call(cmd, shell=True) + def run_rowcount(args): if args.rebuild: rebuild_scalding() @@ -257,6 +288,27 @@ def run_keysmissingcol(args): env=args.env) subprocess.call(cmd, shell=True) +def run_dumpungrobided(args): + if args.rebuild: + rebuild_scalding() + print("Starting dumpungrobided job...") + output = "{}/output-{}/{}-dumpungrobided".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.DumpUnGrobidedJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --output {output}""".format( + output=output, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env) + subprocess.call(cmd, shell=True) + def main(): parser = argparse.ArgumentParser() @@ -281,6 +333,11 @@ def main(): sub_extract.add_argument('input_cdx', help="full HDFS path of CDX file to extract") + sub_extractungrobided = subparsers.add_parser('extract-ungrobided') + sub_extractungrobided.set_defaults(func=run_extract_ungrobided) + sub_extractungrobided.add_argument('input_ungrobided', + help="full HDFS path of 'ungrobided' file to extract") + sub_rowcount = subparsers.add_parser('row-count') sub_rowcount.set_defaults(func=run_rowcount) @@ -320,6 +377,9 @@ def main(): sub_keysmissingcol.add_argument('column', help="column to SCAN for missing keys") + sub_dumpungrobided = subparsers.add_parser('dump-ungrobided') + sub_dumpungrobided.set_defaults(func=run_dumpungrobided) + args = parser.parse_args() if not args.__dict__.get("func"): print("tell me what to do! (try --help)") diff --git a/python/common.py b/python/common.py index 6710044..e596b35 100644 --- a/python/common.py +++ b/python/common.py @@ -1,4 +1,5 @@ +import json from datetime import datetime NORMAL_MIME = ( @@ -71,3 +72,28 @@ def parse_cdx_line(raw_cdx): # 'i' intentionally not set heritrix = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) return {'key': key, 'file:mime': mime, 'file:cdx': info, 'f:c': heritrix} + +def parse_ungrobided_line(raw_line): + + line = raw_line.strip().split("\t") + if len(line) != 4: + return None + + key = line[0] + mime = normalize_mime(line[2]) + try: + f_c = json.loads(line[1]) + cdx = json.loads(line[3]) + except json.JSONDecodeError: + return None + + if not (key[5:].isalnum() and len(key) == 37 and mime != None): + print(mime) + print(key) + print("FAIL") + return None + + if '-' in (key, mime, f_c, cdx): + return None + + return {'key': key, 'file:mime': mime, 'file:cdx': cdx, 'f:c': f_c} diff --git a/python/extraction_ungrobided.py b/python/extraction_ungrobided.py new file mode 100755 index 0000000..4074112 --- /dev/null +++ b/python/extraction_ungrobided.py @@ -0,0 +1,268 @@ +#!/usr/bin/env python3 +""" +Variant of extraction_cdx_grobid which takes a partial metadata list as input +instead of CDX. + +This task list is dumped by another Hadoop job which scans over the HBase table +quickly, which allows this job to skip a (relatively) expensive HBase read +per-row. + +Requires: +- happybase +- mrjob +- wayback/GWB libraries +""" + +# XXX: some broken MRO thing going on in here due to python3 object wrangling +# in `wayback` library. Means we can't run pylint. +# pylint: skip-file + +import xml +import json +import raven +import struct +import requests +import happybase +import mrjob +from mrjob.job import MRJob +import wayback.exception +from wayback.resource import Resource +from wayback.resource import ArcResource +from wayback.resourcestore import ResourceStore +from gwb.loader import CDXLoaderFactory +from common import parse_ungrobided_line +from grobid2json import teixml2json + +# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable +sentry_client = raven.Client() + +# Specific poison-pill rows we should skip +KEY_BLACKLIST = ( + 'sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT', # "failed to guess ARC header format" +) + +class MRExtractUnGrobided(MRJob): + + # "ungrobided" TSV lines in; JSON status out + #HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.NLineInputFormat' + #INPUT_PROTOCOL = mrjob.protocol.RawProtocol + INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol + OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol + + def configure_args(self): + super(MRExtractUnGrobided, self).configure_args() + + self.add_passthru_arg('--hbase-table', + type=str, + default='wbgrp-journal-extract-0-qa', + help='HBase table to backfill into (must exist)') + self.add_passthru_arg('--hbase-host', + type=str, + default='localhost', + help='HBase thrift API host to connect to') + self.add_passthru_arg('--grobid-uri', + type=str, + default='http://localhost:8070', + help='URI of GROBID API Server') + self.add_passthru_arg('--warc-uri-prefix', + type=str, + default='https://archive.org/serve/', + help='URI where WARCs can be found') + + def __init__(self, *args, **kwargs): + super(MRExtractUnGrobided, self).__init__(*args, **kwargs) + self.mime_filter = ['application/pdf'] + self.hb_table = None + + def grobid_process_fulltext(self, content): + r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument", + files={'input': content}) + return r + + def mapper_init(self): + + if self.hb_table: + return + + sentry_client.tags_context(dict(hbase_table=self.options.hbase_table)) + try: + host = self.options.hbase_host + # TODO: make these configs accessible from... mrconf.cfg? + hb_conn = happybase.Connection(host=host, transport="framed", + protocol="compact") + except Exception: + raise Exception("Couldn't connect to HBase using host: {}".format(host)) + self.hb_table = hb_conn.table(self.options.hbase_table) + + def parse_ungrobided_line(self, raw_line): + """Line should be TSV and have non-null fields: + + - key (string) + - f:c (string, json) + - file:mime (string) + - file:cdx (string, json) + """ + + if (raw_line.startswith(' ') or raw_line.startswith('#')): + return None, dict(status="invalid", reason="line prefix", input=raw_line) + + info = parse_ungrobided_line(raw_line) + if info is None: + return None, dict(status="invalid", reason="ungrobided parse") + + if info['file:mime'] not in self.mime_filter: + return None, dict(status="skip", reason="mimetype", mimetype=info['file:mime']) + + # If warc is not item/file.(w)arc.gz form, skip it + if len(info['file:cdx']['warc'].split('/')) != 2: + return None, dict(status="skip", reason="WARC path not petabox item/file", path=info['file:cdx']['warc']) + + return info, None + + def fetch_warc_content(self, warc_path, offset, c_size): + warc_uri = self.options.warc_uri_prefix + warc_path + try: + rstore = ResourceStore(loaderfactory=CDXLoaderFactory()) + gwb_record = rstore.load_resource(warc_uri, offset, c_size) + except wayback.exception.ResourceUnavailable: + return None, dict(status="error", + reason="failed to load file contents from wayback/petabox") + + if gwb_record.get_status()[0] != 200: + return None, dict(status="error", + reason="archived HTTP response (WARC) was not 200", + warc_status=gwb_record.get_status()[0]) + return gwb_record.open_raw_content().read(), None + + def extract(self, info): + + # Fetch data from WARCs in petabox + original_content, status = self.fetch_warc_content( + info['file:cdx']['warc'], + info['file:cdx']['offset'], + info['file:cdx']['c_size']) + if status: + return None, status + + info['file:size'] = len(original_content) + + # Submit to GROBID + try: + grobid_response = self.grobid_process_fulltext(original_content) + except requests.exceptions.ConnectionError: + return None, dict(status="error", reason="connection to GROBID worker") + + info['grobid0:status_code'] = grobid_response.status_code + + # 4 MByte XML size limit; don't record GROBID status on this path + if len(grobid_response.content) > 4000000: + info['grobid0:status'] = {'status': 'oversize'} + return info, dict(status="oversize", reason="TEI response was too large") + + if grobid_response.status_code != 200: + # response.text is .content decoded as utf-8 + info['grobid0:status'] = dict(status='error', description=grobid_response.text) + return info, dict(status="error", reason="non-200 GROBID HTTP status", + extra=grobid_response.text) + + info['grobid0:status'] = {'status': 'partial'} + info['grobid0:tei_xml'] = grobid_response.content + + # Convert TEI XML to JSON + try: + info['grobid0:tei_json'] = teixml2json(info['grobid0:tei_xml'], encumbered=True) + except xml.etree.ElementTree.ParseError: + info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML parse error") + return info, info['grobid0:status'] + except ValueError: + info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML non-TEI content") + return info, info['grobid0:status'] + + tei_metadata = info['grobid0:tei_json'].copy() + for k in ('body', 'annex'): + # Remove fulltext (copywritted) content + tei_metadata.pop(k, None) + info['grobid0:metadata'] = tei_metadata + + # Determine extraction "quality" + # TODO: + + info['grobid0:quality'] = None + info['grobid0:status'] = {'status': 'success'} + + return info, None + + @sentry_client.capture_exceptions + def mapper(self, _, raw_line): + """ + 1. parse filtered line + 2. fetch data from wayback + 3. submit to GROBID + 4. convert GROBID response to JSON (and metadata) + 6. determine "quality" + 6. push results to hbase + """ + + self.increment_counter('lines', 'total') + + # Parse line and filter down + info, status = self.parse_ungrobided_line(raw_line) + if info is None: + self.increment_counter('lines', status['status']) + yield _, status + return + key = info['key'] + if key in KEY_BLACKLIST: + self.increment_counter('lines', 'blacklist') + yield _, dict(status='blacklist', key=key) + return + + # Note: this may not get "cleared" correctly + sentry_client.extra_context(dict(row_key=key)) + + # Do the extraction + info, status = self.extract(info) + if info is None: + self.increment_counter('lines', status['status']) + status['key'] = key + yield _, status + return + extraction_status = status + + # Decide what to bother inserting back into HBase + # Basically, don't overwrite backfill fields. + grobid_status_code = info.get('grobid0:status_code', None) + for k in list(info.keys()): + if k.encode('utf-8') in ('f:c', 'file:mime', 'file:cdx'): + info.pop(k) + + # Convert fields to binary + for k in list(info.keys()): + if info[k] is None: + info.pop(k) + # NOTE: we're not actually sending these f:*, file:* keys... + elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json', + 'grobid0:metadata'): + assert type(info[k]) == dict + info[k] = json.dumps(info[k], sort_keys=True, indent=None) + elif k in ('file:size', 'grobid0:status_code'): + # encode as int64 in network byte order + if info[k] != {} and info[k] != None: + info[k] = struct.pack('!q', info[k]) + + key = info.pop('key') + self.hb_table.put(key, info) + self.increment_counter('lines', 'success') + + if extraction_status is not None: + yield _, dict(status="partial", key=key, + grobid_status_code=grobid_status_code, + reason=extraction_status['reason']) + else: + yield _, dict(status="success", + grobid_status_code=grobid_status_code, key=key, + extra=extraction_status) + + +if __name__ == '__main__': # pragma: no cover + MRExtractUnGrobided.run() diff --git a/python/tests/files/example_ungrobided.tsv b/python/tests/files/example_ungrobided.tsv new file mode 100644 index 0000000..9263b6f --- /dev/null +++ b/python/tests/files/example_ungrobided.tsv @@ -0,0 +1,20 @@ +sha1:23LOSW2QVMKUYXPFZBXQHBBNQR45WTMU {"c": 1, "d": "2017-10-27T22:21:13", "f": "PDFS-20171027214658-00155.warc.gz", "o": 984263791, "u": "http://circ.ahajournals.org/content/circulationaha/53/6/965.full.pdf"} application/pdf {"c_size": 1050532, "dt": "20171027222113", "offset": 984263791, "surt": "org,ahajournals,circ)/content/circulationaha/53/6/965.full.pdf", "url": "http://circ.ahajournals.org/content/circulationaha/53/6/965.full.pdf", "warc": "PDFS-20171027125450-crawl815/PDFS-20171027214658-00155.warc.gz"} +sha1:23M2N262M5TWB7F3BVB6ESD3Q26SMPFA {"c": 1, "d": "2012-09-29T07:05:16", "f": "ARCHIVEIT-219-QUARTERLY-FWGZDI-20120929065657-00119-crawling203.us.archive.org-6680.warc.gz", "o": 83570746, "u": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=37&id=225&artlang=en"} application/pdf {"c_size": 3590, "dt": "20120929070516", "offset": 83570746, "surt": "edu,indiana)/~orafaq/faq/pdf.php?artlang=en&cat=37&id=225", "url": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=37&id=225&artlang=en", "warc": "ARCHIVEIT-219-QUARTERLY-FWGZDI-00001/ARCHIVEIT-219-QUARTERLY-FWGZDI-20120929065657-00119-crawling203.us.archive.org-6680.warc.gz"} +sha1:23MFQLDGP4WJD67BS7ERMYQUF7TGCG5X {"c": 1, "d": "2017-08-25T15:19:28", "f": "MSAG-PDF-CRAWL-2017-08-04-20170825143335512-08107-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", "o": 573475485, "u": "http://www.bloodjournal.org/content/bloodjournal/77/7/1484.full.pdf?sso-checked=true"} application/pdf {"c_size": 3411470, "dt": "20170825151928", "offset": 573475485, "surt": "org,bloodjournal)/content/bloodjournal/77/7/1484.full.pdf?sso-checked=true", "url": "http://www.bloodjournal.org/content/bloodjournal/77/7/1484.full.pdf?sso-checked=true", "warc": "MSAG-PDF-CRAWL-2017-08-04-20170825114428485-08102-08111-wbgrp-svc284/MSAG-PDF-CRAWL-2017-08-04-20170825143335512-08107-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"} +sha1:23MG6K5Z3JENYCZ2OGTNOJ7QPYANJRCZ {"c": 1, "d": "2017-10-10T11:29:50", "f": "PDFS-20171010110639-00278.warc.gz", "o": 665222706, "u": "http://circ.ahajournals.org/content/circulationaha/53/5/797.full.pdf?download=true"} application/pdf {"c_size": 1107121, "dt": "20171010112950", "offset": 665222706, "surt": "org,ahajournals,circ)/content/circulationaha/53/5/797.full.pdf?download=true", "url": "http://circ.ahajournals.org/content/circulationaha/53/5/797.full.pdf?download=true", "warc": "PDFS-20171010110639-crawl815/PDFS-20171010110639-00278.warc.gz"} +sha1:23MN67JQKDWRUXJMXXJ2GX6O43SQIV76 {"c": 1, "d": "2015-06-06T18:17:08", "f": "ARCHIVEIT-219-QUARTERLY-9582-20150606125000169-00071-wbgrp-crawl067.us.archive.org-6440.warc.gz", "o": 603211220, "u": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=95&artlang=en"} application/pdf {"c_size": 3450, "dt": "20150606181708", "offset": 603211220, "surt": "edu,indiana)/~orafaq/faq/pdf.php?artlang=en&cat=36&id=95", "url": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=95&artlang=en", "warc": "ARCHIVEIT-219-QUARTERLY-9582-00007/ARCHIVEIT-219-QUARTERLY-9582-20150606125000169-00071-wbgrp-crawl067.us.archive.org-6440.warc.gz"} +sha1:23MQCEOMQS5SMZCXIPNQ4E3ZKOCF6DIM {"c": 1, "d": "2004-01-02T19:45:22", "f": "DU_crawl10.20040102194453.arc.gz", "o": 38062592, "u": "http://www.csupomona.edu:80/~darsadmin/ACTIONITEMS.pdf"} application/pdf {"c_size": 15406, "dt": "20040102194522", "offset": 38062592, "surt": "edu,csupomona)/~darsadmin/actionitems.pdf", "url": "http://www.csupomona.edu:80/~darsadmin/ACTIONITEMS.pdf", "warc": "DU_crawl10.20040102181929-c/DU_crawl10.20040102194453.arc.gz"} +sha1:23NKO4TW6XCESXMSUOOICI3AXVK6Z5BL {"c": 1, "d": "2015-04-27T11:16:33", "f": "eric.ed.gov-inf-20150409-030712-1648j-00064.warc.gz", "o": 3872820483, "u": "http://files.eric.ed.gov/fulltext/ED088632.pdf"} application/pdf {"c_size": 2223528, "dt": "20150427111633", "offset": 3872820483, "surt": "gov,ed,eric,files)/fulltext/ed088632.pdf", "url": "http://files.eric.ed.gov/fulltext/ED088632.pdf", "warc": "archiveteam_archivebot_go_20150427150006/eric.ed.gov-inf-20150409-030712-1648j-00064.warc.gz"} +sha1:23NW2EPLXDA6UBIJLQMM2DJ2K3GL3WTB {"c": 1, "d": "2014-08-13T09:04:30", "f": "WIDE-20140813084304-09684.warc.gz", "o": 726289594, "u": "http://research.sdccd.edu/docs/Accreditation/2012%20Surveys/Employee%20-%20Briefing/Mesa%20College%202012%20Employee%20Feedback%20Survey%20Briefing.pdf"} application/pdf {"c_size": 3472527, "dt": "20140813090430", "offset": 726289594, "surt": "edu,sdccd,research)/docs/accreditation/2012%20surveys/employee%20-%20briefing/mesa%20college%202012%20employee%20feedback%20survey%20briefing.pdf", "url": "http://research.sdccd.edu/docs/Accreditation/2012%20Surveys/Employee%20-%20Briefing/Mesa%20College%202012%20Employee%20Feedback%20Survey%20Briefing.pdf", "warc": "WIDE-20140813074743-crawl424/WIDE-20140813084304-09684.warc.gz"} +sha1:23OQICQ4IBVNHJBWJX5ON3QR26KNMQNT {"c": 1, "d": "2010-06-29T00:35:17", "f": "EDG-20100628234135-01241-ia360918.us.archive.org.warc.gz", "o": 572430160, "u": "http://journalism.arizona.edu/news/rockypt6.pdf"} application/pdf {"c_size": 194706, "dt": "20100629003517", "offset": 572430160, "surt": "edu,arizona,journalism)/news/rockypt6.pdf", "url": "http://journalism.arizona.edu/news/rockypt6.pdf", "warc": "EDG-20100628214935-01235-01243-ia360918-20100629023741-00000/EDG-20100628234135-01241-ia360918.us.archive.org.warc.gz"} +sha1:23OT2AAYPJ3Z5ZOQXVJJTVTKY6QUPICI {"c": 1, "d": "2007-02-25T17:49:01", "f": "38_0_20070225174831_crawl28.arc.gz", "o": 93868066, "u": "http://www.ece.tufts.edu:80/~hopwood/tampa-proceedings.pdf"} application/pdf {"c_size": 162157, "dt": "20070225174901", "offset": 93868066, "surt": "edu,tufts,ece)/~hopwood/tampa-proceedings.pdf", "url": "http://www.ece.tufts.edu:80/~hopwood/tampa-proceedings.pdf", "warc": "38_0_20070225173722_crawl28-c/38_0_20070225174831_crawl28.arc.gz"} +sha1:23OUFX3ZYMF53HY4RUONR5PKN4HXN4O3 {"c": 1, "d": "2004-05-26T06:45:34", "f": "DW_crawl10.20040526064432.arc.gz", "o": 67593910, "u": "http://207.36.165.114:80/NewOrleans/Papers/1301466.pdf"} application/pdf {"c_size": 306879, "dt": "20040526064534", "offset": 67593910, "surt": "114,165,36,207)/neworleans/papers/1301466.pdf", "url": "http://207.36.165.114:80/NewOrleans/Papers/1301466.pdf", "warc": "DW_crawl10.20040525230808-c/DW_crawl10.20040526064432.arc.gz"} +sha1:23PA23UIWCBA3CSTDK2JYX7ZIVOHULFG {"c": 1, "d": "2016-02-05T21:48:33", "f": "NLNZ-NZ-CRAWL-005-20160205211003375-02839-6291~wbgrp-crawl007.us.archive.org~8443.warc.gz", "o": 630386943, "u": "http://homepages.engineering.auckland.ac.nz/~smohan/Outreach/Docs/2013/TTU_REU2013.pdf"} application/pdf {"c_size": 2979614, "dt": "20160205214833", "offset": 630386943, "surt": "nz,ac,auckland,engineering,homepages)/~smohan/outreach/docs/2013/ttu_reu2013.pdf", "url": "http://homepages.engineering.auckland.ac.nz/~smohan/Outreach/Docs/2013/TTU_REU2013.pdf", "warc": "NLNZ-NZ-CRAWL-005-20160205211003375-02839-02848-wbgrp-crawl007/NLNZ-NZ-CRAWL-005-20160205211003375-02839-6291~wbgrp-crawl007.us.archive.org~8443.warc.gz"} +sha1:23PGC74CTD7P6PCF3MZZZJMPYFXRK3OB {"c": 1, "d": "2005-03-17T15:05:51", "f": "EC_binary1_crawl30.20050317150502.arc.gz", "o": 75675778, "u": "http://www.csupomona.edu:80/%7Eengineering/programs/courses/aro/course_outlines/aro_407.pdf"} application/pdf {"c_size": 4842, "dt": "20050317150551", "offset": 75675778, "surt": "edu,csupomona)/~engineering/programs/courses/aro/course_outlines/aro_407.pdf", "url": "http://www.csupomona.edu:80/%7Eengineering/programs/courses/aro/course_outlines/aro_407.pdf", "warc": "EC_binary1_crawl30.20050317135651-c/EC_binary1_crawl30.20050317150502.arc.gz"} +sha1:23PKJEQWUJAIQQSLP3GCCC5VDXN4RFCX {"c": 1, "d": "2017-10-10T23:50:37", "f": "WIDE-20171010214240-16560.warc.gz", "o": 962106404, "u": "http://www.nbrb.by/bv/articles/8997.pdf"} application/pdf {"c_size": 273375, "dt": "20171010235037", "offset": 962106404, "surt": "by,nbrb)/bv/articles/8997.pdf", "url": "http://www.nbrb.by/bv/articles/8997.pdf", "warc": "WIDE-20171010202419-crawl424/WIDE-20171010214240-16560.warc.gz"} +sha1:23PRILJUIQUKHRYQIUYAKSBFPH53FOGT {"c": 1, "d": "2017-07-14T18:51:38", "f": "WIDE-20170714181144-06521.warc.gz", "o": 820382225, "u": "http://carsandracingstuff.com/library/articles/32538.pdf"} application/pdf {"c_size": 125426, "dt": "20170714185138", "offset": 820382225, "surt": "com,carsandracingstuff)/library/articles/32538.pdf", "url": "http://carsandracingstuff.com/library/articles/32538.pdf", "warc": "WIDE-20170714174218-crawl426/WIDE-20170714181144-06521.warc.gz"} +sha1:23PTUXWSNSVE4HS5J7ELDUUG63J2FPCI {"c": 1, "d": "2016-06-09T00:27:36", "f": "WIDE-20160609001810-06993.warc.gz", "o": 287880616, "u": "http://www.case-research.eu/sites/default/files/publications/18092393_E-brief_Dabrowski_Monetary_Policy_final_0.pdf"} application/pdf {"c_size": 68262, "dt": "20160609002736", "offset": 287880616, "surt": "eu,case-research)/sites/default/files/publications/18092393_e-brief_dabrowski_monetary_policy_final_0.pdf", "url": "http://www.case-research.eu/sites/default/files/publications/18092393_E-brief_Dabrowski_Monetary_Policy_final_0.pdf", "warc": "WIDE-20160609000312-crawl427/WIDE-20160609001810-06993.warc.gz"} +sha1:23PW2APYHNBPIBRIVNQ6TMKUNY53UL3D {"c": 1, "d": "2016-01-07T03:29:03", "f": "MUSEUM-20160107025230-02354.warc.gz", "o": 413484441, "u": "http://www.portlandoregon.gov/fire/article/363695"} application/pdf {"c_size": 44600, "dt": "20160107032903", "offset": 413484441, "surt": "gov,portlandoregon)/fire/article/363695", "url": "http://www.portlandoregon.gov/fire/article/363695", "warc": "MUSEUM-20160107004301-crawl891/MUSEUM-20160107025230-02354.warc.gz"} +sha1:23RJIHUIOYY5747CR6YYCTMACXDCFYTT {"c": 1, "d": "2014-06-07T18:00:56", "f": "ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz", "o": 720590380, "u": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=264&artlang=en"} application/pdf {"c_size": 3727, "dt": "20140607180056", "offset": 720590380, "surt": "edu,indiana)/~orafaq/faq/pdf.php?artlang=en&cat=36&id=264", "url": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=264&artlang=en", "warc": "ARCHIVEIT-219-QUARTERLY-20047-00001/ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz"} +sha1:23SMLYPFEGIRV6M37FJ5D364TXQXCSMR {"c": 1, "d": "2011-07-12T22:20:32", "f": "WIDE-20110712221302-03146.warc.gz", "o": 222089710, "u": "http://media.dailyuw.com/papers/_091030_7-14_color_web.pdf"} application/pdf {"c_size": 4654708, "dt": "20110712222032", "offset": 222089710, "surt": "com,dailyuw,media)/papers/_091030_7-14_color_web.pdf", "url": "http://media.dailyuw.com/papers/_091030_7-14_color_web.pdf", "warc": "WIDE-20110712221302-crawl413/WIDE-20110712221302-03146.warc.gz"} +sha1:23SN4XBPSCRPRIHH5UAV45LFCP3VDV3V {"c": 1, "d": "2010-10-28T09:03:57", "f": "WIDE-20101028084449158-00409-23450~ia360921.us.archive.org~9443.warc.gz", "o": 756726028, "u": "http://cdacnoida.in/ASCNT-2010/Language%20Technology/Paper/Reducing%20Errors%20in%20Translation%20using%20Pre-editor%20for%20Indian%20English%20Sentences.pdf"} application/pdf {"c_size": 98408, "dt": "20101028090357", "offset": 756726028, "surt": "in,cdacnoida)/ascnt-2010/language%20technology/paper/reducing%20errors%20in%20translation%20using%20pre-editor%20for%20indian%20english%20sentences.pdf", "url": "http://cdacnoida.in/ASCNT-2010/Language%20Technology/Paper/Reducing%20Errors%20in%20Translation%20using%20Pre-editor%20for%20Indian%20English%20Sentences.pdf", "warc": "WIDE-20101028063239344-00397-00415-ia360921/WIDE-20101028084449158-00409-23450~ia360921.us.archive.org~9443.warc.gz"} diff --git a/python/tests/test_extraction_ungrobided.py b/python/tests/test_extraction_ungrobided.py new file mode 100644 index 0000000..366d392 --- /dev/null +++ b/python/tests/test_extraction_ungrobided.py @@ -0,0 +1,176 @@ + +import io +import json +import mrjob +import pytest +import struct +import responses +import happybase_mock +import wayback.exception +from unittest import mock +from common import parse_ungrobided_line +from extraction_ungrobided import MRExtractUnGrobided + + +FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843) +OK_UNGROBIDED_LINE = b"\t".join(( + b"sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ", + b"""{"c": 1, "d": "2017-07-06T07:54:11", "f": "CITESEERX-CRAWL-2017-06-20-20170706075012840-00388-3671~wbgrp-svc285.us.archive.org~8443.warc.gz", "o": 914718776, "u": "http://www.ibc7.org/article/file_down.php?mode%3Darticle_print%26pid%3D250"}""", + b"application/pdf", + b"""{"c_size": 501, "dt": "20170706075411", "offset": 914718776, "surt": "org,ibc7)/article/file_down.php?mode=article_print&pid=250", "url": "http://www.ibc7.org/article/file_down.php?mode%3Darticle_print%26pid%3D250", "warc": "CITESEERX-CRAWL-2017-06-20-20170706074206206-00379-00388-wbgrp-svc285/CITESEERX-CRAWL-2017-06-20-20170706075012840-00388-3671~wbgrp-svc285.us.archive.org~8443.warc.gz"}""", +)) + +with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f: + REAL_TEI_XML = f.read() + +@pytest.fixture +def job(): + """ + Note: this mock only seems to work with job.run_mapper(), not job.run(); + the later results in a separate instantiation without the mock? + """ + job = MRExtractUnGrobided(['--no-conf', '-']) + + conn = happybase_mock.Connection() + conn.create_table('wbgrp-journal-extract-test', + {'file': {}, 'grobid0': {}, 'f': {}}) + job.hb_table = conn.table('wbgrp-journal-extract-test') + + return job + + +@mock.patch('extraction_ungrobided.MRExtractUnGrobided.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) +@responses.activate +def test_mapper_single_line(mock_fetch, job): + + responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, + body=REAL_TEI_XML, content_type='text/xml') + + raw = io.BytesIO(OK_UNGROBIDED_LINE) + + output = io.BytesIO() + job.sandbox(stdin=raw, stdout=output) + + job.run_mapper() + + # for debugging tests + #print(output.getvalue().decode('utf-8')) + #print(list(job.hb_table.scan())) + + # wayback gets FETCH 1x times + mock_fetch.assert_called_once_with( + "CITESEERX-CRAWL-2017-06-20-20170706074206206-00379-00388-wbgrp-svc285/CITESEERX-CRAWL-2017-06-20-20170706075012840-00388-3671~wbgrp-svc285.us.archive.org~8443.warc.gz", + 914718776, + 501) + + # grobid gets POST 1x times + assert len(responses.calls) == 1 + + # HBase + assert job.hb_table.row(b'1') == {} + + # Saved extraction info + row = job.hb_table.row(b'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ') + + assert struct.unpack("!q", row[b'file:size'])[0] == len(FAKE_PDF_BYTES) + assert row[b'file:mime'] == b"application/pdf" + assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200 + # TODO: assert row[b'grobid0:quality'] == None + status = json.loads(row[b'grobid0:status'].decode('utf-8')) + assert type(status) == type(dict()) + assert row[b'grobid0:tei_xml'].decode('utf-8') == REAL_TEI_XML + tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8')) + metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8')) + assert tei_json['title'] == metadata['title'] + assert 'body' in tei_json + assert 'body' not in metadata + +@mock.patch('extraction_ungrobided.MRExtractUnGrobided.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) +@responses.activate +def test_mapper_lines(mock_fetch, job): + + responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, + body=REAL_TEI_XML, content_type='text/xml') + + raw = io.BytesIO(b"""sha1:23PTUXWSNSVE4HS5J7ELDUUG63J2FPCI\t{"c": 1, "d": "2016-06-09T00:27:36", "f": "WIDE-20160609001810-06993.warc.gz", "o": 287880616, "u": "http://www.case-research.eu/sites/default/files/publications/18092393_E-brief_Dabrowski_Monetary_Policy_final_0.pdf"}\tapplication/pdf\t{"c_size": 68262, "dt": "20160609002736", "offset": 287880616, "surt": "eu,case-research)/sites/default/files/publications/18092393_e-brief_dabrowski_monetary_policy_final_0.pdf", "url": "http://www.case-research.eu/sites/default/files/publications/18092393_E-brief_Dabrowski_Monetary_Policy_final_0.pdf", "warc": "WIDE-20160609000312-crawl427/WIDE-20160609001810-06993.warc.gz"} +sha1:23PW2APYHNBPIBRIVNQ6TMKUNY53UL3D\t{"c": 1, "d": "2016-01-07T03:29:03", "f": "MUSEUM-20160107025230-02354.warc.gz", "o": 413484441, "u": "http://www.portlandoregon.gov/fire/article/363695"}\tapplication/pdf\t{"c_size": 44600, "dt": "20160107032903", "offset": 413484441, "surt": "gov,portlandoregon)/fire/article/363695", "url": "http://www.portlandoregon.gov/fire/article/363695", "warc": "MUSEUM-20160107004301-crawl891/MUSEUM-20160107025230-02354.warc.gz"} +sha1:23RJIHUIOYY5747CR6YYCTMACXDCFYTT\t{"c": 1, "d": "2014-06-07T18:00:56", "f": "ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz", "o": 720590380, "u": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=264&artlang=en"}\tapplication/pdf\t{"c_size": 3727, "dt": "20140607180056", "offset": 720590380, "surt": "edu,indiana)/~orafaq/faq/pdf.php?artlang=en&cat=36&id=264", "url": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=264&artlang=en", "warc": "ARCHIVEIT-219-QUARTERLY-20047-00001/ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz"}""") + + + output = io.BytesIO() + job.sandbox(stdin=raw, stdout=output) + + job.run_mapper() + + # for debugging tests + #print(output.getvalue().decode('utf-8')) + #print(list(job.hb_table.scan())) + + # grobid gets POST 3x times + assert len(responses.calls) == 3 + + # wayback gets FETCH 3x times + mock_fetch.assert_has_calls(( + mock.call("WIDE-20160609000312-crawl427/WIDE-20160609001810-06993.warc.gz", 287880616, 68262), + mock.call("MUSEUM-20160107004301-crawl891/MUSEUM-20160107025230-02354.warc.gz", 413484441, 44600), + mock.call("ARCHIVEIT-219-QUARTERLY-20047-00001/ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz", 720590380, 3727), + )) + + # Saved extraction info + assert job.hb_table.row(b'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ') == {} + assert job.hb_table.row(b'sha1:23PTUXWSNSVE4HS5J7ELDUUG63J2FPCI') != {} + assert job.hb_table.row(b'sha1:23PW2APYHNBPIBRIVNQ6TMKUNY53UL3D') != {} + assert job.hb_table.row(b'sha1:23RJIHUIOYY5747CR6YYCTMACXDCFYTT') != {} + + row = job.hb_table.row(b'sha1:23RJIHUIOYY5747CR6YYCTMACXDCFYTT') + assert struct.unpack("!q", row[b'file:size'])[0] == len(FAKE_PDF_BYTES) + assert row[b'file:mime'] == b"application/pdf" + assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200 + status = json.loads(row[b'grobid0:status'].decode('utf-8')) + assert type(status) == type(dict()) + assert row[b'grobid0:tei_xml'].decode('utf-8') == REAL_TEI_XML + tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8')) + metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8')) + assert tei_json['title'] == metadata['title'] + assert 'body' in tei_json + assert 'body' not in metadata + +def test_parse_ungrobided_invalid(job): + + print("space-prefixed line") + raw = " com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" + info, status = job.parse_ungrobided_line(raw) + assert info is None + assert status['status'] == "invalid" + assert 'prefix' in status['reason'] + + print("commented line") + raw = "#com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" + info, status = job.parse_ungrobided_line(raw) + assert info is None + assert status['status'] == "invalid" + assert 'prefix' in status['reason'] + + print("wrong column count") + raw = "a b c d e" + info, status = job.parse_ungrobided_line(raw) + assert info is None + assert status['status'] == "invalid" + assert 'parse' in status['reason'] + + print("CDX line, somehow") + raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf - 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" + info, status = job.parse_ungrobided_line(raw) + assert info is None + print(status) + assert status['status'] == "invalid" + assert 'parse' in status['reason'] + +def test_parse_ungrobided_valid(): + + parsed = parse_ungrobided_line(OK_UNGROBIDED_LINE.decode('utf-8')) + assert parsed['key'] == "sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ" + assert parsed['f:c']['u'] == "http://www.ibc7.org/article/file_down.php?mode%3Darticle_print%26pid%3D250" + assert parsed['file:mime'] == "application/pdf" + assert parsed['file:cdx']['c_size'] == 501 + assert parsed['file:cdx']['dt'] == "20170706075411" diff --git a/scalding/src/main/scala/sandcrawler/DumpUnGrobidedJob.scala b/scalding/src/main/scala/sandcrawler/DumpUnGrobidedJob.scala new file mode 100644 index 0000000..7fd3ce0 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/DumpUnGrobidedJob.scala @@ -0,0 +1,67 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +// Filters for HBase rows which have not had GROBID run on them, but do have +// full CDX metadata, and dumps to a TSV for later extraction by the +// "extraction-ungrobided" job. +// +// Does the same horrible join thing that DumpUnGrobidedJob does. +class DumpUnGrobidedJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val output = args("output") + + val allKeys : TypedPipe[(String,String,String,String)] = DumpUnGrobidedJob.getHBaseKeySource( + args("hbase-table"), + args("zookeeper-hosts")) + .read + .fromBytesWritable('key, 'c, 'mime, 'cdx) + .toTypedPipe[(String,String,String,String)]('key, 'c, 'mime, 'cdx) + + val existingKeys : TypedPipe[(String,Boolean)] = DumpUnGrobidedJob.getHBaseColSource( + args("hbase-table"), + args("zookeeper-hosts")) + .read + .fromBytesWritable('key) + .toTypedPipe[String]('key) + .map{ key => (key, true) } + + val missingKeys : TypedPipe[(String,String,String,String)] = allKeys + .groupBy(_._1) + .leftJoin(existingKeys.groupBy(_._1)) + .toTypedPipe + .collect { case (key, ((_, c, mime, cdx), None)) => (key, c, mime, cdx) } + + missingKeys + .write(TypedTsv[(String,String,String,String)](output)) + +} + +object DumpUnGrobidedJob { + + // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181" + def getHBaseColSource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = { + HBaseBuilder.build( + hbaseTable, + zookeeperHosts, + List("grobid0:status_code"), + SourceMode.SCAN_ALL) + } + + def getHBaseKeySource(hbaseTable: String, zookeeperHosts: String) : HBaseSource = { + HBaseBuilder.build( + hbaseTable, + zookeeperHosts, + List("f:c", "file:mime", "file:cdx"), + SourceMode.SCAN_ALL) + } +} diff --git a/scalding/src/test/scala/sandcrawler/DumpUnGrobidedJobTest.scala b/scalding/src/test/scala/sandcrawler/DumpUnGrobidedJobTest.scala new file mode 100644 index 0000000..8dda5c8 --- /dev/null +++ b/scalding/src/test/scala/sandcrawler/DumpUnGrobidedJobTest.scala @@ -0,0 +1,72 @@ +package sandcrawler + +import cascading.tuple.Fields +import cascading.tuple.Tuple +import com.twitter.scalding.JobTest +import com.twitter.scalding.Tsv +import com.twitter.scalding.TupleConversions +import com.twitter.scalding.TypedTsv +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import org.junit.runner.RunWith +import org.scalatest.FunSpec +import org.scalatest.junit.JUnitRunner +import org.slf4j.LoggerFactory +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBaseSource +import scala._ + +@RunWith(classOf[JUnitRunner]) +class DumpUnGrobidedJobTest extends FunSpec with TupleConversions { + + val output = "/tmp/testOutput" + val (testTable, testHost) = ("test-table", "dummy-host:2181") + + val log = LoggerFactory.getLogger(this.getClass.getName) + + val statusCode: Long = 200 + val statusBytes = Bytes.toBytes(statusCode) + + val sampleDataGrobid : List[List[Array[Byte]]] = List( + ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", statusBytes), + ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", statusBytes), + ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", statusBytes), + ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", statusBytes), + ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", statusBytes), + ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", statusBytes)) + .map(pair => List(Bytes.toBytes(pair._1), pair._2)) + + val sampleDataFile : List[List[Array[Byte]]] = List( + ("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""), + ("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""), + ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""), + ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""), + ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""), + ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""), + ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", """{c-json-data}""", "application/pdf", """{cdx-json-data}"""), + ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", """{c-json-data}""", "application/pdf", """{cdx-json-data}""")) + .map(pair => List(Bytes.toBytes(pair._1), + Bytes.toBytes(pair._2), + Bytes.toBytes(pair._3), + Bytes.toBytes(pair._4))) + + JobTest("sandcrawler.DumpUnGrobidedJob") + .arg("test", "") + .arg("app.conf.path", "app.conf") + .arg("output", output) + .arg("hbase-table", testTable) + .arg("zookeeper-hosts", testHost) + .arg("debug", "true") + .source[Tuple](DumpUnGrobidedJob.getHBaseColSource(testTable, testHost), + sampleDataGrobid.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) + .source[Tuple](DumpUnGrobidedJob.getHBaseKeySource(testTable, testHost), + sampleDataFile.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*))) + .sink[Tuple](TypedTsv[(String,String,String,String)](output)) { + outputBuffer => + it("should return correct-length list.") { + assert(outputBuffer.size === 2) + } + } + .run + .finish +} |