From d7830b4a5aad0a59a588e98798711f0e694d50d6 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 25 Sep 2019 17:51:07 -0700 Subject: refactor old python hadoop code into new directory --- python/README.md | 104 --------- python/backfill_hbase_from_cdx.py | 88 -------- python/extraction_cdx_grobid.py | 299 ------------------------- python/extraction_ungrobided.py | 292 ------------------------ python/kafka_grobid_hbase.py | 200 ----------------- python/mrjob.conf | 16 -- python/tests/files/example_ungrobided.tsv | 20 -- python/tests/test_backfill_hbase_from_cdx.py | 74 ------- python/tests/test_extraction_cdx_grobid.py | 319 --------------------------- python/tests/test_extraction_ungrobided.py | 178 --------------- 10 files changed, 1590 deletions(-) delete mode 100644 python/README.md delete mode 100755 python/backfill_hbase_from_cdx.py delete mode 100755 python/extraction_cdx_grobid.py delete mode 100755 python/extraction_ungrobided.py delete mode 100755 python/kafka_grobid_hbase.py delete mode 100644 python/mrjob.conf delete mode 100644 python/tests/files/example_ungrobided.tsv delete mode 100644 python/tests/test_backfill_hbase_from_cdx.py delete mode 100644 python/tests/test_extraction_cdx_grobid.py delete mode 100644 python/tests/test_extraction_ungrobided.py (limited to 'python') diff --git a/python/README.md b/python/README.md deleted file mode 100644 index 198c949..0000000 --- a/python/README.md +++ /dev/null @@ -1,104 +0,0 @@ - -Hadoop streaming map/reduce jobs written in python using the mrjob library. - -## Development and Testing - -System dependencies on Linux (ubuntu/debian): - - sudo apt install -y python3-dev python3-pip python3-wheel libjpeg-dev build-essential - pip3 install --user pipenv - -On macOS (using Homebrew): - - brew install libjpeg pipenv - -You probably need `~/.local/bin` on your `$PATH`. - -Fetch all python dependencies with: - - pipenv install --dev - -Run the tests with: - - pipenv run pytest - -Check test coverage with: - - pytest --cov --cov-report html - # open ./htmlcov/index.html in a browser - -## Troubleshooting - -If you get pipenv errors like: - - AttributeError: '_NamespacePath' object has no attribute 'sort' - - ---------------------------------------- - - Command "python setup.py egg_info" failed with error code 1 in /1/tmp/pip-install-h7lb6tqz/proto-google-cloud-datastore-v1/ - - ☤ ▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉ 0/8 — 00:00:03 - bnewbold@bnewbold-dev$ - bnewbold@bnewbold-dev$ pipenv install --deploy --dev - Installing dependencies from Pipfile.lock (e82980)… - An error occurred while installing proto-google-cloud-logging-v2==0.91.3! Will try again. - An error occurred while installing gapic-google-cloud-error-reporting-v1beta1==0.15.3! Will try again. - An error occurred while installing gapic-google-cloud-datastore-v1==0.15.3! Will try again. - An error occurred while installing proto-google-cloud-datastore-v1==0.90.4! Will try again. - -Then something has gone horribly wrong with your pip/pipenv/python setup. Don't -have a good workaround yet. - -## Running Python Jobs on Hadoop - -The `../please` script automates these steps; you should use that instead. - -When running python streaming jobs on the actual hadoop cluster, we need to -bundle along our python dependencies in a virtual env tarball. Building this -tarball can be done like: - - export PIPENV_VENV_IN_PROJECT=1 - pipenv install --deploy - tar -czf venv-current.tar.gz -C .venv . - -### Extraction Task - -An example actually connecting to HBase from a local machine, with thrift -running on a devbox and GROBID running on a dedicated machine: - - ./extraction_cdx_grobid.py \ - --hbase-table wbgrp-journal-extract-0-qa \ - --hbase-host wbgrp-svc263.us.archive.org \ - --grobid-uri http://wbgrp-svc096.us.archive.org:8070 \ - tests/files/example.cdx - -Running from the cluster (once a ./venv-current.tar.gz tarball exists): - - ./extraction_cdx_grobid.py \ - --hbase-table wbgrp-journal-extract-0-qa \ - --hbase-host wbgrp-svc263.us.archive.org \ - --grobid-uri http://wbgrp-svc096.us.archive.org:8070 \ - -r hadoop \ - -c mrjob.conf \ - --archive venv-current.tar.gz#venv \ - hdfs:///user/bnewbold/journal_crawl_cdx/citeseerx_crawl_2017.cdx - -### Backfill Task - -An example actually connecting to HBase from a local machine, with thrift -running on a devbox: - - ./backfill_hbase_from_cdx.py \ - --hbase-table wbgrp-journal-extract-0-qa \ - --hbase-host wbgrp-svc263.us.archive.org \ - tests/files/example.cdx - -Running from the cluster (once a ./venv-current.tar.gz tarball exists): - - ./backfill_hbase_from_cdx.py \ - --hbase-host wbgrp-svc263.us.archive.org \ - --hbase-table wbgrp-journal-extract-0-qa \ - -r hadoop \ - -c mrjob.conf \ - --archive venv-current.tar.gz#venv \ - hdfs:///user/bnewbold/journal_crawl_cdx/citeseerx_crawl_2017.cdx diff --git a/python/backfill_hbase_from_cdx.py b/python/backfill_hbase_from_cdx.py deleted file mode 100755 index 6b2ec0b..0000000 --- a/python/backfill_hbase_from_cdx.py +++ /dev/null @@ -1,88 +0,0 @@ -#!/usr/bin/env python3 -""" -Streaming Hadoop script to import CDX metadata into the HBase fulltext table, -primarily for URL-agnostic crawl de-duplication. Takes only "fulltext" file -formats. - -Requires: -- happybase -- mrjob -""" - -import json -import happybase -import mrjob -from mrjob.job import MRJob -from common import parse_cdx_line - - -class MRCDXBackfillHBase(MRJob): - - # CDX lines in; JSON status out - INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol - OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol - - def configure_args(self): - super(MRCDXBackfillHBase, self).configure_args() - - self.add_passthru_arg('--hbase-table', - type=str, - default='wbgrp-journal-extract-0-qa', - help='HBase table to backfill into (must exist)') - self.add_passthru_arg('--hbase-host', - type=str, - default='localhost', - help='HBase thrift API host to connect to') - - def __init__(self, *args, **kwargs): - super(MRCDXBackfillHBase, self).__init__(*args, **kwargs) - self.mime_filter = ['application/pdf'] - self.hb_table = None - - def mapper_init(self): - - if self.hb_table: - return - - try: - host = self.options.hbase_host - # TODO: make these configs accessible from... mrconf.cfg? - hb_conn = happybase.Connection(host=host, transport="framed", - protocol="compact") - except Exception: - raise Exception("Couldn't connect to HBase using host: {}".format(host)) - self.hb_table = hb_conn.table(self.options.hbase_table) - - def mapper(self, _, raw_cdx): - - self.increment_counter('lines', 'total') - - if (raw_cdx.startswith(' ') or raw_cdx.startswith('filedesc') or - raw_cdx.startswith('#')): - self.increment_counter('lines', 'invalid') - yield _, dict(status="invalid", reason="line prefix") - return - - info = parse_cdx_line(raw_cdx) - if info is None: - self.increment_counter('lines', 'invalid') - yield _, dict(status="invalid") - return - - if info['file:mime'] not in self.mime_filter: - self.increment_counter('lines', 'skip') - yield _, dict(status="skip", reason="unwanted mimetype") - return - - key = info.pop('key') - info['f:c'] = json.dumps(info['f:c'], sort_keys=True, indent=None) - info['file:cdx'] = json.dumps(info['file:cdx'], - sort_keys=True, indent=None) - - self.hb_table.put(key, info) - self.increment_counter('lines', 'success') - - yield _, dict(status="success") - -if __name__ == '__main__': # pragma: no cover - MRCDXBackfillHBase.run() diff --git a/python/extraction_cdx_grobid.py b/python/extraction_cdx_grobid.py deleted file mode 100755 index 88580e1..0000000 --- a/python/extraction_cdx_grobid.py +++ /dev/null @@ -1,299 +0,0 @@ -#!/usr/bin/env python3 -""" -Streaming Hadoop script to import extract metadata and body from fulltext (eg, -PDF) files using GROBID. Input is a CDX file; results primarly go to HBase, -with status written to configurable output stream. - -Fulltext files are loaded directly from WARC files in petabox, instead of going -through the wayback replay. - -Requires: -- happybase -- mrjob -- wayback/GWB libraries -""" - -# XXX: some broken MRO thing going on in here due to python3 object wrangling -# in `wayback` library. Means we can't run pylint. -# pylint: skip-file - -import os -import xml -import json -import raven -import struct -import requests -import happybase -import mrjob -from mrjob.job import MRJob -import wayback.exception -from http.client import IncompleteRead -from wayback.resourcestore import ResourceStore -from gwb.loader import CDXLoaderFactory - -from common import parse_cdx_line -from grobid2json import teixml2json - -# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable -sentry_client = raven.Client() - -# Specific poison-pill rows we should skip -KEY_DENYLIST = ( - 'sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT', # "failed to guess ARC header format" -) - -class MRExtractCdxGrobid(MRJob): - - # CDX lines in; JSON status out - #HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.NLineInputFormat' - #INPUT_PROTOCOL = mrjob.protocol.RawProtocol - INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol - OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol - - def configure_args(self): - super(MRExtractCdxGrobid, self).configure_args() - - self.add_passthru_arg('--hbase-table', - type=str, - default='wbgrp-journal-extract-0-qa', - help='HBase table to backfill into (must exist)') - self.add_passthru_arg('--hbase-host', - type=str, - default='localhost', - help='HBase thrift API host to connect to') - self.add_passthru_arg('--grobid-uri', - type=str, - default='http://localhost:8070', - help='URI of GROBID API Server') - self.add_passthru_arg('--warc-uri-prefix', - type=str, - default='https://archive.org/serve/', - help='URI where WARCs can be found') - self.add_passthru_arg('--force-existing', - action="store_true", - help='Re-processes (with GROBID) existing lines') - - def __init__(self, *args, **kwargs): - super(MRExtractCdxGrobid, self).__init__(*args, **kwargs) - self.hb_table = None - self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET')) - self.mime_filter = ['application/pdf'] - self.rstore = None - - def grobid_process_fulltext(self, content): - r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument", - files={'input': content}) - return r - - def mapper_init(self): - - if self.hb_table: - return - - sentry_client.tags_context(dict(hbase_table=self.options.hbase_table)) - try: - host = self.options.hbase_host - # TODO: make these configs accessible from... mrconf.cfg? - hb_conn = happybase.Connection(host=host, transport="framed", - protocol="compact") - except Exception: - raise Exception("Couldn't connect to HBase using host: {}".format(host)) - self.hb_table = hb_conn.table(self.options.hbase_table) - - def parse_line(self, raw_cdx): - - if (raw_cdx.startswith(' ') or raw_cdx.startswith('filedesc') or - raw_cdx.startswith('#')): - return None, dict(status="invalid", reason="line prefix") - - info = parse_cdx_line(raw_cdx) - if info is None: - return None, dict(status="invalid", reason="CDX parse") - - if info['file:mime'] not in self.mime_filter: - return None, dict(status="skip", reason="mimetype") - - # If warc is not item/file.(w)arc.gz form, skip it - if len(info['file:cdx']['warc'].split('/')) != 2: - return None, dict(status="skip", reason="WARC path not petabox item/file") - - return info, None - - def fetch_warc_content(self, warc_path, offset, c_size): - warc_uri = self.options.warc_uri_prefix + warc_path - if not self.rstore: - self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory( - webdata_secret=self.petabox_webdata_secret, - download_base_url=self.options.warc_uri_prefix)) - try: - gwb_record = self.rstore.load_resource(warc_uri, offset, c_size) - except wayback.exception.ResourceUnavailable: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (ResourceUnavailable)") - except ValueError as ve: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (ValueError: {})".format(ve)) - except EOFError as eofe: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (EOFError: {})".format(eofe)) - except TypeError as te: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te)) - # Note: could consider a generic "except Exception" here, as we get so - # many petabox errors. Do want jobs to fail loud and clear when the - # whole cluster is down though. - - if gwb_record.get_status()[0] != 200: - return None, dict(status="error", - reason="archived HTTP response (WARC) was not 200", - warc_status=gwb_record.get_status()[0]) - - try: - raw_content = gwb_record.open_raw_content().read() - except IncompleteRead as ire: - return None, dict(status="error", - reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire)) - return raw_content, None - - def extract(self, info): - - # Fetch data from WARCs in petabox - original_content, status = self.fetch_warc_content( - info['file:cdx']['warc'], - info['file:cdx']['offset'], - info['file:cdx']['c_size']) - if status: - return None, status - - info['file:size'] = len(original_content) - - # Submit to GROBID - try: - grobid_response = self.grobid_process_fulltext(original_content) - except requests.exceptions.ConnectionError: - return None, dict(status="error", reason="connection to GROBID worker") - - info['grobid0:status_code'] = grobid_response.status_code - - # 4 MByte XML size limit; don't record GROBID status on this path - if len(grobid_response.content) > 4000000: - info['grobid0:status'] = {'status': 'oversize'} - return info, dict(status="oversize", reason="TEI response was too large") - - if grobid_response.status_code != 200: - # response.text is .content decoded as utf-8 - info['grobid0:status'] = dict(status='error', description=grobid_response.text) - return info, dict(status="error", reason="non-200 GROBID HTTP status", - extra=grobid_response.text) - - info['grobid0:status'] = {'status': 'partial'} - info['grobid0:tei_xml'] = grobid_response.content - - # Convert TEI XML to JSON - try: - info['grobid0:tei_json'] = teixml2json(info['grobid0:tei_xml'], encumbered=True) - except xml.etree.ElementTree.ParseError: - info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML parse error") - return info, info['grobid0:status'] - except ValueError: - info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML non-TEI content") - return info, info['grobid0:status'] - - tei_metadata = info['grobid0:tei_json'].copy() - for k in ('body', 'annex'): - # Remove fulltext (copywritted) content - tei_metadata.pop(k, None) - info['grobid0:metadata'] = tei_metadata - - # Determine extraction "quality" - # TODO: - - info['grobid0:quality'] = None - info['grobid0:status'] = {'status': 'success'} - - return info, None - - @sentry_client.capture_exceptions - def mapper(self, _, raw_cdx): - """ - 1. parse CDX line - 2. check what is already in hbase - 3. fetch data from wayback - 4. submit to GROBID - 5. convert GROBID response to JSON (and metadata) - 6. determine "quality" - 7. push results to hbase - """ - - self.increment_counter('lines', 'total') - - # Parse line and filter down - info, status = self.parse_line(raw_cdx) - if info is None: - self.increment_counter('lines', status['status']) - yield _, status - return - key = info['key'] - if key in KEY_DENYLIST: - self.increment_counter('lines', 'denylist') - yield _, dict(status='denylist', key=key) - return - - # Note: this may not get "cleared" correctly - sentry_client.extra_context(dict(row_key=key)) - - # Check if we've already processed this line - oldrow = self.hb_table.row(key, - columns=[b'f:c', b'file', b'grobid0:status_code']) - if (oldrow.get(b'grobid0:status_code', None) != None - and not self.options.force_existing): - # This file has already been processed; skip it - self.increment_counter('lines', 'existing') - yield _, dict(status="existing", key=key) - return - - # Do the extraction - info, status = self.extract(info) - if info is None: - self.increment_counter('lines', status['status']) - status['key'] = key - yield _, status - return - extraction_status = status - - # Decide what to bother inserting back into HBase - # Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx') - grobid_status_code = info.get('grobid0:status_code', None) - for k in list(info.keys()): - if k.encode('utf-8') in oldrow: - info.pop(k) - - # Convert fields to binary - for k in list(info.keys()): - if info[k] is None: - info.pop(k) - elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json', - 'grobid0:metadata'): - assert type(info[k]) == dict - info[k] = json.dumps(info[k], sort_keys=True, indent=None) - elif k in ('file:size', 'grobid0:status_code'): - # encode as int64 in network byte order - if info[k] != {} and info[k] != None: - info[k] = struct.pack('!q', info[k]) - - key = info.pop('key') - self.hb_table.put(key, info) - self.increment_counter('lines', 'success') - - if extraction_status is not None: - yield _, dict(status="partial", key=key, - grobid_status_code=grobid_status_code, - reason=extraction_status['reason']) - else: - yield _, dict(status="success", - grobid_status_code=grobid_status_code, key=key, - extra=extraction_status) - - -if __name__ == '__main__': # pragma: no cover - MRExtractCdxGrobid.run() diff --git a/python/extraction_ungrobided.py b/python/extraction_ungrobided.py deleted file mode 100755 index 225e46f..0000000 --- a/python/extraction_ungrobided.py +++ /dev/null @@ -1,292 +0,0 @@ -#!/usr/bin/env python3 -""" -Variant of extraction_cdx_grobid which takes a partial metadata list as input -instead of CDX. - -This task list is dumped by another Hadoop job which scans over the HBase table -quickly, which allows this job to skip a (relatively) expensive HBase read -per-row. - -Requires: -- happybase -- mrjob -- wayback/GWB libraries -""" - -# XXX: some broken MRO thing going on in here due to python3 object wrangling -# in `wayback` library. Means we can't run pylint. -# pylint: skip-file - -import os -import xml -import json -import raven -import struct -import requests -import happybase -import mrjob -from mrjob.job import MRJob -import wayback.exception -from http.client import IncompleteRead -from wayback.resourcestore import ResourceStore -from gwb.loader import CDXLoaderFactory - -from common import parse_ungrobided_line -from grobid2json import teixml2json - -# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable -sentry_client = raven.Client() - -# Specific poison-pill rows we should skip -KEY_DENYLIST = ( - 'sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT', # "failed to guess ARC header format" -) - -class MRExtractUnGrobided(MRJob): - - # "ungrobided" TSV lines in; JSON status out - #HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.NLineInputFormat' - #INPUT_PROTOCOL = mrjob.protocol.RawProtocol - INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol - OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol - - def configure_args(self): - super(MRExtractUnGrobided, self).configure_args() - - self.add_passthru_arg('--hbase-table', - type=str, - default='wbgrp-journal-extract-0-qa', - help='HBase table to backfill into (must exist)') - self.add_passthru_arg('--hbase-host', - type=str, - default='localhost', - help='HBase thrift API host to connect to') - self.add_passthru_arg('--grobid-uri', - type=str, - default='http://localhost:8070', - help='URI of GROBID API Server') - self.add_passthru_arg('--warc-uri-prefix', - type=str, - default='https://archive.org/serve/', - help='URI where WARCs can be found') - - def __init__(self, *args, **kwargs): - super(MRExtractUnGrobided, self).__init__(*args, **kwargs) - self.hb_table = None - self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET')) - self.mime_filter = ['application/pdf'] - self.rstore = None - - def grobid_process_fulltext(self, content): - r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument", - files={'input': content}) - return r - - def mapper_init(self): - - if self.hb_table: - return - - sentry_client.tags_context(dict(hbase_table=self.options.hbase_table)) - try: - host = self.options.hbase_host - # TODO: make these configs accessible from... mrconf.cfg? - hb_conn = happybase.Connection(host=host, transport="framed", - protocol="compact") - except Exception: - raise Exception("Couldn't connect to HBase using host: {}".format(host)) - self.hb_table = hb_conn.table(self.options.hbase_table) - - def parse_ungrobided_line(self, raw_line): - """Line should be TSV and have non-null fields: - - - key (string) - - f:c (string, json) - - file:mime (string) - - file:cdx (string, json) - """ - - if (raw_line.startswith(' ') or raw_line.startswith('#')): - return None, dict(status="invalid", reason="line prefix", input=raw_line) - - info = parse_ungrobided_line(raw_line) - if info is None: - return None, dict(status="invalid", reason="ungrobided parse") - - if info['file:mime'] not in self.mime_filter: - return None, dict(status="skip", reason="mimetype", mimetype=info['file:mime']) - - # If warc is not item/file.(w)arc.gz form, skip it - if len(info['file:cdx']['warc'].split('/')) != 2: - return None, dict(status="skip", reason="WARC path not petabox item/file", path=info['file:cdx']['warc']) - - return info, None - - def fetch_warc_content(self, warc_path, offset, c_size): - warc_uri = self.options.warc_uri_prefix + warc_path - if not self.rstore: - self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory( - webdata_secret=self.petabox_webdata_secret, - download_base_url=self.options.warc_uri_prefix)) - try: - gwb_record = self.rstore.load_resource(warc_uri, offset, c_size) - except wayback.exception.ResourceUnavailable: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (ResourceUnavailable)") - except ValueError as ve: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (ValueError: {})".format(ve)) - except EOFError as eofe: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (EOFError: {})".format(eofe)) - except TypeError as te: - return None, dict(status="error", - reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te)) - # Note: could consider a generic "except Exception" here, as we get so - # many petabox errors. Do want jobs to fail loud and clear when the - # whole cluster is down though. - - if gwb_record.get_status()[0] != 200: - return None, dict(status="error", - reason="archived HTTP response (WARC) was not 200", - warc_status=gwb_record.get_status()[0]) - - try: - raw_content = gwb_record.open_raw_content().read() - except IncompleteRead as ire: - return None, dict(status="error", - reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire)) - return raw_content, None - - def extract(self, info): - - # Fetch data from WARCs in petabox - original_content, status = self.fetch_warc_content( - info['file:cdx']['warc'], - info['file:cdx']['offset'], - info['file:cdx']['c_size']) - if status: - return None, status - - info['file:size'] = len(original_content) - - # Submit to GROBID - try: - grobid_response = self.grobid_process_fulltext(original_content) - except requests.exceptions.ConnectionError: - return None, dict(status="error", reason="connection to GROBID worker") - - info['grobid0:status_code'] = grobid_response.status_code - - # 4 MByte XML size limit; don't record GROBID status on this path - if len(grobid_response.content) > 4000000: - info['grobid0:status'] = {'status': 'oversize'} - return info, dict(status="oversize", reason="TEI response was too large") - - if grobid_response.status_code != 200: - # response.text is .content decoded as utf-8 - info['grobid0:status'] = dict(status='error', description=grobid_response.text) - return info, dict(status="error", reason="non-200 GROBID HTTP status", - extra=grobid_response.text) - - info['grobid0:status'] = {'status': 'partial'} - info['grobid0:tei_xml'] = grobid_response.content - - # Convert TEI XML to JSON - try: - info['grobid0:tei_json'] = teixml2json(info['grobid0:tei_xml'], encumbered=True) - except xml.etree.ElementTree.ParseError: - info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML parse error") - return info, info['grobid0:status'] - except ValueError: - info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML non-TEI content") - return info, info['grobid0:status'] - - tei_metadata = info['grobid0:tei_json'].copy() - for k in ('body', 'annex'): - # Remove fulltext (copywritted) content - tei_metadata.pop(k, None) - info['grobid0:metadata'] = tei_metadata - - # Determine extraction "quality" - # TODO: - - info['grobid0:quality'] = None - info['grobid0:status'] = {'status': 'success'} - - return info, None - - @sentry_client.capture_exceptions - def mapper(self, _, raw_line): - """ - 1. parse filtered line - 2. fetch data from wayback - 3. submit to GROBID - 4. convert GROBID response to JSON (and metadata) - 6. determine "quality" - 6. push results to hbase - """ - - self.increment_counter('lines', 'total') - - # Parse line and filter down - info, status = self.parse_ungrobided_line(raw_line) - if info is None: - self.increment_counter('lines', status['status']) - yield _, status - return - key = info['key'] - if key in KEY_DENYLIST: - self.increment_counter('lines', 'denylist') - yield _, dict(status='denylist', key=key) - return - - # Note: this may not get "cleared" correctly - sentry_client.extra_context(dict(row_key=key)) - - # Do the extraction - info, status = self.extract(info) - if info is None: - self.increment_counter('lines', status['status']) - status['key'] = key - yield _, status - return - extraction_status = status - - # Decide what to bother inserting back into HBase - # Basically, don't overwrite backfill fields. - grobid_status_code = info.get('grobid0:status_code', None) - for k in list(info.keys()): - if k in ('f:c', 'file:mime', 'file:cdx'): - info.pop(k) - - # Convert fields to binary - for k in list(info.keys()): - if info[k] is None: - info.pop(k) - # NOTE: we're not actually sending these f:*, file:* keys... - elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json', - 'grobid0:metadata'): - assert type(info[k]) == dict - info[k] = json.dumps(info[k], sort_keys=True, indent=None) - elif k in ('file:size', 'grobid0:status_code'): - # encode as int64 in network byte order - if info[k] != {} and info[k] != None: - info[k] = struct.pack('!q', info[k]) - - key = info.pop('key') - self.hb_table.put(key, info) - self.increment_counter('lines', 'success') - - if extraction_status is not None: - yield _, dict(status="partial", key=key, - grobid_status_code=grobid_status_code, - reason=extraction_status['reason']) - else: - yield _, dict(status="success", - grobid_status_code=grobid_status_code, key=key, - extra=extraction_status) - - -if __name__ == '__main__': # pragma: no cover - MRExtractUnGrobided.run() diff --git a/python/kafka_grobid_hbase.py b/python/kafka_grobid_hbase.py deleted file mode 100755 index b52c386..0000000 --- a/python/kafka_grobid_hbase.py +++ /dev/null @@ -1,200 +0,0 @@ -#!/usr/bin/env python3 -""" -Kafka worker that consumes GROBID output from Kafka and pushes into HBase. - -Based on the ungrobided Hadoop job code. - -TODO: binary conversion in 'grobided' topic? shouldn't be, do that here, as well as all TEI extraction/parsing - -Requires: -- requests -- pykafka -""" - -# XXX: some broken MRO thing going on in here due to python3 object wrangling -# in `wayback` library. Means we can't run pylint. -# pylint: skip-file - -import sys -import xml -import json -import raven -import struct -import requests -import argparse -import happybase -import pykafka - -from common import parse_ungrobided_line -from grobid2json import teixml2json - -# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable -sentry_client = raven.Client() - -# Specific poison-pill rows we should skip -KEY_DENYLIST = ( - 'sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT', # "failed to guess ARC header format" -) - -class KafkaGrobidHbaseWorker: - - def __init__(self, kafka_hosts, consume_topic, **kwargs): - self.consume_topic = consume_topic - self.consumer_group = kwargs.get('consumer_group', 'grobid-hbase-insert2') - self.kafka_hosts = kafka_hosts or 'localhost:9092' - self.hbase_host = kwargs['hbase_host'] - self.hbase_table_name = kwargs['hbase_table'] - self.hb_table = None # connection initialized in run() - - def convert_tei(self, info): - - # Convert TEI XML to JSON - try: - info['grobid0:tei_json'] = teixml2json(info['grobid0:tei_xml'], encumbered=True) - except xml.etree.ElementTree.ParseError: - info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML parse error") - return info, info['grobid0:status'] - except ValueError: - info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML non-TEI content") - return info, info['grobid0:status'] - - tei_metadata = info['grobid0:tei_json'].copy() - for k in ('body', 'annex'): - # Remove fulltext (copywritted) content - tei_metadata.pop(k, None) - info['grobid0:metadata'] = tei_metadata - return info, None - - def do_work(self, raw_line): - """ - 1. parse info JSON (with XML inside) - 2. do XML -> JSON conversions - 3. push to HBase - - Returns: ??? - """ - - # Parse line and filter down - info = json.loads(raw_line) - key = info['key'] - if key in KEY_DENYLIST: - #self.increment_counter('lines', 'denylist') - return None, dict(status='denylist', key=key) - - # Note: this may not get "cleared" correctly - sentry_client.extra_context(dict(row_key=key)) - print("inserting line to HBase: {}".format(key)) - - if info.get('grobid0:tei_xml'): - # Need to decode 'str' back in to 'bytes' (from JSON serialization) - info['grobid0:tei_xml'] = info['grobid0:tei_xml'].encode('utf-8') - - if info.get('grobid0:status') == 200 and info.get('grobid0:tei_xml'): - info, status = self.convert_tei(info) - - # Decide what to bother inserting back into HBase - # Basically, don't overwrite backfill fields. - grobid_status_code = info.get('grobid0:status_code', None) - for k in list(info.keys()): - if k in ('f:c', 'file:mime', 'file:cdx'): - info.pop(k) - - # Convert fields to binary - for k in list(info.keys()): - if info[k] is None: - info.pop(k) - # NOTE: we're not actually sending these f:*, file:* keys... - elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json', - 'grobid0:metadata'): - assert type(info[k]) == dict - info[k] = json.dumps(info[k], sort_keys=True, indent=None) - elif k in ('file:size', 'grobid0:status_code'): - # encode as int64 in network byte order - if info[k] != {} and info[k] != None: - info[k] = struct.pack('!q', info[k]) - - key = info.pop('key') - self.hb_table.put(key, info) - #self.increment_counter('lines', 'success') - - return info, dict(status="success", - grobid_status_code=grobid_status_code, key=key) - - def run(self): - - # 1. start consumer (in managed/balanced fashion, with consumer group) - # 2. for each thingie, do the work; if success publish to kafka; either - # way... print? log? - # 3. repeat! - - print("Starting grobid-hbase-worker...") - try: - host = self.hbase_host - hb_conn = happybase.Connection(host=host, transport="framed", - protocol="compact") - except Exception: - raise Exception("Couldn't connect to HBase using host: {}".format(host)) - self.hb_table = hb_conn.table(self.hbase_table_name) - print("HBase inserting into {}".format(self.hbase_table_name)) - - kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="2.0.0") - consume_topic = kafka.topics[self.consume_topic] - - sequential_failures = 0 - consumer = consume_topic.get_balanced_consumer( - consumer_group=self.consumer_group, - managed=True, - auto_commit_enable=True, - # needed to avoid MessageSet decode errors - fetch_message_max_bytes=4*1024*1024, - # LATEST because best to miss processing than waste time re-process - auto_offset_reset=pykafka.common.OffsetType.LATEST, - compacted_topic=True) - print("Kafka consuming {} in group {}".format( - self.consume_topic, - self.consumer_group)) - - for msg in consumer: - #print("got a line! ") - grobid_output, status = self.do_work(msg.value.decode('utf-8')) - if grobid_output: - sequential_failures = 0 - else: - sys.stderr.write("Failed to process GROBID extraction output: {}\n".format(status)) - sequential_failures += 1 - if sequential_failures > 20: - sys.stderr.write("too many failures in a row, bailing out\n") - sys.exit(-1) - - -@sentry_client.capture_exceptions -def main(): - - parser = argparse.ArgumentParser() - parser.add_argument('--kafka-hosts', - default="localhost:9092", - help="list of Kafka brokers (host/port) to use") - parser.add_argument('--kafka-env', - default="qa", - help="eg, 'qa' or 'prod'") - parser.add_argument('--consume-topic', - default=None, - help="Kafka topic to consume from") - parser.add_argument('--hbase-table', - type=str, - default='wbgrp-journal-extract-0-qa', - help='HBase table to backfill into (must exist)') - parser.add_argument('--hbase-host', - type=str, - default='localhost', - help='HBase thrift API host to connect to') - args = parser.parse_args() - - if args.consume_topic is None: - args.consume_topic = "sandcrawler-{}.grobid-output".format(args.kafka_env) - - worker = KafkaGrobidHbaseWorker(**args.__dict__) - worker.run() - -if __name__ == '__main__': # pragma: no cover - main() diff --git a/python/mrjob.conf b/python/mrjob.conf deleted file mode 100644 index 6f36196..0000000 --- a/python/mrjob.conf +++ /dev/null @@ -1,16 +0,0 @@ -runners: - local: - upload_files: - - common.py - - grobid2json.py - setup: - - export PYTHONPATH=$PYTHONPATH:venv/lib/python3.5/site-packages/ - hadoop: - no_output: true - upload_files: - - common.py - - grobid2json.py - setup: - - export PYTHONPATH=$PYTHONPATH:venv/lib/python3.5/site-packages/ - cmdenv: - SENTRY_DSN: https://6ab6ad080d034280b863f294e07cc5c6:414ebf0b68634f669d2dc00d7c935699@books-sentry.us.archive.org/9 diff --git a/python/tests/files/example_ungrobided.tsv b/python/tests/files/example_ungrobided.tsv deleted file mode 100644 index 9263b6f..0000000 --- a/python/tests/files/example_ungrobided.tsv +++ /dev/null @@ -1,20 +0,0 @@ -sha1:23LOSW2QVMKUYXPFZBXQHBBNQR45WTMU {"c": 1, "d": "2017-10-27T22:21:13", "f": "PDFS-20171027214658-00155.warc.gz", "o": 984263791, "u": "http://circ.ahajournals.org/content/circulationaha/53/6/965.full.pdf"} application/pdf {"c_size": 1050532, "dt": "20171027222113", "offset": 984263791, "surt": "org,ahajournals,circ)/content/circulationaha/53/6/965.full.pdf", "url": "http://circ.ahajournals.org/content/circulationaha/53/6/965.full.pdf", "warc": "PDFS-20171027125450-crawl815/PDFS-20171027214658-00155.warc.gz"} -sha1:23M2N262M5TWB7F3BVB6ESD3Q26SMPFA {"c": 1, "d": "2012-09-29T07:05:16", "f": "ARCHIVEIT-219-QUARTERLY-FWGZDI-20120929065657-00119-crawling203.us.archive.org-6680.warc.gz", "o": 83570746, "u": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=37&id=225&artlang=en"} application/pdf {"c_size": 3590, "dt": "20120929070516", "offset": 83570746, "surt": "edu,indiana)/~orafaq/faq/pdf.php?artlang=en&cat=37&id=225", "url": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=37&id=225&artlang=en", "warc": "ARCHIVEIT-219-QUARTERLY-FWGZDI-00001/ARCHIVEIT-219-QUARTERLY-FWGZDI-20120929065657-00119-crawling203.us.archive.org-6680.warc.gz"} -sha1:23MFQLDGP4WJD67BS7ERMYQUF7TGCG5X {"c": 1, "d": "2017-08-25T15:19:28", "f": "MSAG-PDF-CRAWL-2017-08-04-20170825143335512-08107-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", "o": 573475485, "u": "http://www.bloodjournal.org/content/bloodjournal/77/7/1484.full.pdf?sso-checked=true"} application/pdf {"c_size": 3411470, "dt": "20170825151928", "offset": 573475485, "surt": "org,bloodjournal)/content/bloodjournal/77/7/1484.full.pdf?sso-checked=true", "url": "http://www.bloodjournal.org/content/bloodjournal/77/7/1484.full.pdf?sso-checked=true", "warc": "MSAG-PDF-CRAWL-2017-08-04-20170825114428485-08102-08111-wbgrp-svc284/MSAG-PDF-CRAWL-2017-08-04-20170825143335512-08107-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"} -sha1:23MG6K5Z3JENYCZ2OGTNOJ7QPYANJRCZ {"c": 1, "d": "2017-10-10T11:29:50", "f": "PDFS-20171010110639-00278.warc.gz", "o": 665222706, "u": "http://circ.ahajournals.org/content/circulationaha/53/5/797.full.pdf?download=true"} application/pdf {"c_size": 1107121, "dt": "20171010112950", "offset": 665222706, "surt": "org,ahajournals,circ)/content/circulationaha/53/5/797.full.pdf?download=true", "url": "http://circ.ahajournals.org/content/circulationaha/53/5/797.full.pdf?download=true", "warc": "PDFS-20171010110639-crawl815/PDFS-20171010110639-00278.warc.gz"} -sha1:23MN67JQKDWRUXJMXXJ2GX6O43SQIV76 {"c": 1, "d": "2015-06-06T18:17:08", "f": "ARCHIVEIT-219-QUARTERLY-9582-20150606125000169-00071-wbgrp-crawl067.us.archive.org-6440.warc.gz", "o": 603211220, "u": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=95&artlang=en"} application/pdf {"c_size": 3450, "dt": "20150606181708", "offset": 603211220, "surt": "edu,indiana)/~orafaq/faq/pdf.php?artlang=en&cat=36&id=95", "url": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=95&artlang=en", "warc": "ARCHIVEIT-219-QUARTERLY-9582-00007/ARCHIVEIT-219-QUARTERLY-9582-20150606125000169-00071-wbgrp-crawl067.us.archive.org-6440.warc.gz"} -sha1:23MQCEOMQS5SMZCXIPNQ4E3ZKOCF6DIM {"c": 1, "d": "2004-01-02T19:45:22", "f": "DU_crawl10.20040102194453.arc.gz", "o": 38062592, "u": "http://www.csupomona.edu:80/~darsadmin/ACTIONITEMS.pdf"} application/pdf {"c_size": 15406, "dt": "20040102194522", "offset": 38062592, "surt": "edu,csupomona)/~darsadmin/actionitems.pdf", "url": "http://www.csupomona.edu:80/~darsadmin/ACTIONITEMS.pdf", "warc": "DU_crawl10.20040102181929-c/DU_crawl10.20040102194453.arc.gz"} -sha1:23NKO4TW6XCESXMSUOOICI3AXVK6Z5BL {"c": 1, "d": "2015-04-27T11:16:33", "f": "eric.ed.gov-inf-20150409-030712-1648j-00064.warc.gz", "o": 3872820483, "u": "http://files.eric.ed.gov/fulltext/ED088632.pdf"} application/pdf {"c_size": 2223528, "dt": "20150427111633", "offset": 3872820483, "surt": "gov,ed,eric,files)/fulltext/ed088632.pdf", "url": "http://files.eric.ed.gov/fulltext/ED088632.pdf", "warc": "archiveteam_archivebot_go_20150427150006/eric.ed.gov-inf-20150409-030712-1648j-00064.warc.gz"} -sha1:23NW2EPLXDA6UBIJLQMM2DJ2K3GL3WTB {"c": 1, "d": "2014-08-13T09:04:30", "f": "WIDE-20140813084304-09684.warc.gz", "o": 726289594, "u": "http://research.sdccd.edu/docs/Accreditation/2012%20Surveys/Employee%20-%20Briefing/Mesa%20College%202012%20Employee%20Feedback%20Survey%20Briefing.pdf"} application/pdf {"c_size": 3472527, "dt": "20140813090430", "offset": 726289594, "surt": "edu,sdccd,research)/docs/accreditation/2012%20surveys/employee%20-%20briefing/mesa%20college%202012%20employee%20feedback%20survey%20briefing.pdf", "url": "http://research.sdccd.edu/docs/Accreditation/2012%20Surveys/Employee%20-%20Briefing/Mesa%20College%202012%20Employee%20Feedback%20Survey%20Briefing.pdf", "warc": "WIDE-20140813074743-crawl424/WIDE-20140813084304-09684.warc.gz"} -sha1:23OQICQ4IBVNHJBWJX5ON3QR26KNMQNT {"c": 1, "d": "2010-06-29T00:35:17", "f": "EDG-20100628234135-01241-ia360918.us.archive.org.warc.gz", "o": 572430160, "u": "http://journalism.arizona.edu/news/rockypt6.pdf"} application/pdf {"c_size": 194706, "dt": "20100629003517", "offset": 572430160, "surt": "edu,arizona,journalism)/news/rockypt6.pdf", "url": "http://journalism.arizona.edu/news/rockypt6.pdf", "warc": "EDG-20100628214935-01235-01243-ia360918-20100629023741-00000/EDG-20100628234135-01241-ia360918.us.archive.org.warc.gz"} -sha1:23OT2AAYPJ3Z5ZOQXVJJTVTKY6QUPICI {"c": 1, "d": "2007-02-25T17:49:01", "f": "38_0_20070225174831_crawl28.arc.gz", "o": 93868066, "u": "http://www.ece.tufts.edu:80/~hopwood/tampa-proceedings.pdf"} application/pdf {"c_size": 162157, "dt": "20070225174901", "offset": 93868066, "surt": "edu,tufts,ece)/~hopwood/tampa-proceedings.pdf", "url": "http://www.ece.tufts.edu:80/~hopwood/tampa-proceedings.pdf", "warc": "38_0_20070225173722_crawl28-c/38_0_20070225174831_crawl28.arc.gz"} -sha1:23OUFX3ZYMF53HY4RUONR5PKN4HXN4O3 {"c": 1, "d": "2004-05-26T06:45:34", "f": "DW_crawl10.20040526064432.arc.gz", "o": 67593910, "u": "http://207.36.165.114:80/NewOrleans/Papers/1301466.pdf"} application/pdf {"c_size": 306879, "dt": "20040526064534", "offset": 67593910, "surt": "114,165,36,207)/neworleans/papers/1301466.pdf", "url": "http://207.36.165.114:80/NewOrleans/Papers/1301466.pdf", "warc": "DW_crawl10.20040525230808-c/DW_crawl10.20040526064432.arc.gz"} -sha1:23PA23UIWCBA3CSTDK2JYX7ZIVOHULFG {"c": 1, "d": "2016-02-05T21:48:33", "f": "NLNZ-NZ-CRAWL-005-20160205211003375-02839-6291~wbgrp-crawl007.us.archive.org~8443.warc.gz", "o": 630386943, "u": "http://homepages.engineering.auckland.ac.nz/~smohan/Outreach/Docs/2013/TTU_REU2013.pdf"} application/pdf {"c_size": 2979614, "dt": "20160205214833", "offset": 630386943, "surt": "nz,ac,auckland,engineering,homepages)/~smohan/outreach/docs/2013/ttu_reu2013.pdf", "url": "http://homepages.engineering.auckland.ac.nz/~smohan/Outreach/Docs/2013/TTU_REU2013.pdf", "warc": "NLNZ-NZ-CRAWL-005-20160205211003375-02839-02848-wbgrp-crawl007/NLNZ-NZ-CRAWL-005-20160205211003375-02839-6291~wbgrp-crawl007.us.archive.org~8443.warc.gz"} -sha1:23PGC74CTD7P6PCF3MZZZJMPYFXRK3OB {"c": 1, "d": "2005-03-17T15:05:51", "f": "EC_binary1_crawl30.20050317150502.arc.gz", "o": 75675778, "u": "http://www.csupomona.edu:80/%7Eengineering/programs/courses/aro/course_outlines/aro_407.pdf"} application/pdf {"c_size": 4842, "dt": "20050317150551", "offset": 75675778, "surt": "edu,csupomona)/~engineering/programs/courses/aro/course_outlines/aro_407.pdf", "url": "http://www.csupomona.edu:80/%7Eengineering/programs/courses/aro/course_outlines/aro_407.pdf", "warc": "EC_binary1_crawl30.20050317135651-c/EC_binary1_crawl30.20050317150502.arc.gz"} -sha1:23PKJEQWUJAIQQSLP3GCCC5VDXN4RFCX {"c": 1, "d": "2017-10-10T23:50:37", "f": "WIDE-20171010214240-16560.warc.gz", "o": 962106404, "u": "http://www.nbrb.by/bv/articles/8997.pdf"} application/pdf {"c_size": 273375, "dt": "20171010235037", "offset": 962106404, "surt": "by,nbrb)/bv/articles/8997.pdf", "url": "http://www.nbrb.by/bv/articles/8997.pdf", "warc": "WIDE-20171010202419-crawl424/WIDE-20171010214240-16560.warc.gz"} -sha1:23PRILJUIQUKHRYQIUYAKSBFPH53FOGT {"c": 1, "d": "2017-07-14T18:51:38", "f": "WIDE-20170714181144-06521.warc.gz", "o": 820382225, "u": "http://carsandracingstuff.com/library/articles/32538.pdf"} application/pdf {"c_size": 125426, "dt": "20170714185138", "offset": 820382225, "surt": "com,carsandracingstuff)/library/articles/32538.pdf", "url": "http://carsandracingstuff.com/library/articles/32538.pdf", "warc": "WIDE-20170714174218-crawl426/WIDE-20170714181144-06521.warc.gz"} -sha1:23PTUXWSNSVE4HS5J7ELDUUG63J2FPCI {"c": 1, "d": "2016-06-09T00:27:36", "f": "WIDE-20160609001810-06993.warc.gz", "o": 287880616, "u": "http://www.case-research.eu/sites/default/files/publications/18092393_E-brief_Dabrowski_Monetary_Policy_final_0.pdf"} application/pdf {"c_size": 68262, "dt": "20160609002736", "offset": 287880616, "surt": "eu,case-research)/sites/default/files/publications/18092393_e-brief_dabrowski_monetary_policy_final_0.pdf", "url": "http://www.case-research.eu/sites/default/files/publications/18092393_E-brief_Dabrowski_Monetary_Policy_final_0.pdf", "warc": "WIDE-20160609000312-crawl427/WIDE-20160609001810-06993.warc.gz"} -sha1:23PW2APYHNBPIBRIVNQ6TMKUNY53UL3D {"c": 1, "d": "2016-01-07T03:29:03", "f": "MUSEUM-20160107025230-02354.warc.gz", "o": 413484441, "u": "http://www.portlandoregon.gov/fire/article/363695"} application/pdf {"c_size": 44600, "dt": "20160107032903", "offset": 413484441, "surt": "gov,portlandoregon)/fire/article/363695", "url": "http://www.portlandoregon.gov/fire/article/363695", "warc": "MUSEUM-20160107004301-crawl891/MUSEUM-20160107025230-02354.warc.gz"} -sha1:23RJIHUIOYY5747CR6YYCTMACXDCFYTT {"c": 1, "d": "2014-06-07T18:00:56", "f": "ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz", "o": 720590380, "u": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=264&artlang=en"} application/pdf {"c_size": 3727, "dt": "20140607180056", "offset": 720590380, "surt": "edu,indiana)/~orafaq/faq/pdf.php?artlang=en&cat=36&id=264", "url": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=264&artlang=en", "warc": "ARCHIVEIT-219-QUARTERLY-20047-00001/ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz"} -sha1:23SMLYPFEGIRV6M37FJ5D364TXQXCSMR {"c": 1, "d": "2011-07-12T22:20:32", "f": "WIDE-20110712221302-03146.warc.gz", "o": 222089710, "u": "http://media.dailyuw.com/papers/_091030_7-14_color_web.pdf"} application/pdf {"c_size": 4654708, "dt": "20110712222032", "offset": 222089710, "surt": "com,dailyuw,media)/papers/_091030_7-14_color_web.pdf", "url": "http://media.dailyuw.com/papers/_091030_7-14_color_web.pdf", "warc": "WIDE-20110712221302-crawl413/WIDE-20110712221302-03146.warc.gz"} -sha1:23SN4XBPSCRPRIHH5UAV45LFCP3VDV3V {"c": 1, "d": "2010-10-28T09:03:57", "f": "WIDE-20101028084449158-00409-23450~ia360921.us.archive.org~9443.warc.gz", "o": 756726028, "u": "http://cdacnoida.in/ASCNT-2010/Language%20Technology/Paper/Reducing%20Errors%20in%20Translation%20using%20Pre-editor%20for%20Indian%20English%20Sentences.pdf"} application/pdf {"c_size": 98408, "dt": "20101028090357", "offset": 756726028, "surt": "in,cdacnoida)/ascnt-2010/language%20technology/paper/reducing%20errors%20in%20translation%20using%20pre-editor%20for%20indian%20english%20sentences.pdf", "url": "http://cdacnoida.in/ASCNT-2010/Language%20Technology/Paper/Reducing%20Errors%20in%20Translation%20using%20Pre-editor%20for%20Indian%20English%20Sentences.pdf", "warc": "WIDE-20101028063239344-00397-00415-ia360921/WIDE-20101028084449158-00409-23450~ia360921.us.archive.org~9443.warc.gz"} diff --git a/python/tests/test_backfill_hbase_from_cdx.py b/python/tests/test_backfill_hbase_from_cdx.py deleted file mode 100644 index 070662b..0000000 --- a/python/tests/test_backfill_hbase_from_cdx.py +++ /dev/null @@ -1,74 +0,0 @@ -""" -TODO: could probably refactor to use unittest.mock.patch('happybase') -""" - -import io -import json -import pytest -import mrjob -import happybase_mock -from backfill_hbase_from_cdx import MRCDXBackfillHBase - -@pytest.fixture -def job(): - """ - Note: this mock only seems to work with job.run_mapper(), not job.run(); - the later results in a separate instantiation without the mock? - """ - job = MRCDXBackfillHBase(['--no-conf', '-']) - - conn = happybase_mock.Connection() - conn.create_table('wbgrp-journal-extract-test', - {'file': {}, 'grobid0': {}, 'f': {}}) - job.hb_table = conn.table('wbgrp-journal-extract-test') - - return job - - -def test_some_lines(job): - - raw = io.BytesIO(b""" -com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 301 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz -eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=761393014319a39f40d32ae3eb3a853f?sequence=1 20170705062202 http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1 application/PDF 200 MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J - - 854156 328850624 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz -com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robots.txt text/plain 200 6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD - - 638 398190140 CITESEERX-CRAWL-2017-06-20-20170705062707827-00049-00058-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705063158203-00053-31209~wbgrp-svc284.us.archive.org~8443.warc.gz -""") - - job.sandbox(stdin=raw) - job.run_mapper() - - assert job.hb_table.row(b'1') == {} - # HTTP 301 - assert job.hb_table.row(b'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ') == {} - # valid - assert job.hb_table.row(b'sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J') != {} - # text/plain - assert job.hb_table.row(b'sha1:6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD') == {} - - row = job.hb_table.row(b'sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J') - assert row[b'file:mime'] == b"application/pdf" - - file_cdx = json.loads(row[b'file:cdx'].decode('utf-8')) - assert int(file_cdx['offset']) == 328850624 - - f_c = json.loads(row[b'f:c'].decode('utf-8')) - assert f_c['u'] == "http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1" - assert b'i' not in f_c - -def test_parse_cdx_skip(job): - - job.mapper_init() - - print("CDX prefix") - raw = " com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" - info, status = job.mapper(None, raw).__next__() - assert info is None - assert status['status'] == "invalid" - assert 'prefix' in status['reason'] - - print("mimetype") - raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf text/html 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" - info, status = job.mapper(None, raw).__next__() - assert info is None - assert status['status'] == "skip" - assert 'mimetype' in status['reason'] - diff --git a/python/tests/test_extraction_cdx_grobid.py b/python/tests/test_extraction_cdx_grobid.py deleted file mode 100644 index 471d94a..0000000 --- a/python/tests/test_extraction_cdx_grobid.py +++ /dev/null @@ -1,319 +0,0 @@ - -import io -import json -import mrjob -import pytest -import struct -import responses -import happybase_mock -import wayback.exception -from unittest import mock -from extraction_cdx_grobid import MRExtractCdxGrobid - - -FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843) -OK_CDX_LINE = b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz""" - -with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f: - REAL_TEI_XML = f.read() - -@pytest.fixture -def job(): - """ - Note: this mock only seems to work with job.run_mapper(), not job.run(); - the later results in a separate instantiation without the mock? - """ - job = MRExtractCdxGrobid(['--no-conf', '-']) - - conn = happybase_mock.Connection() - conn.create_table('wbgrp-journal-extract-test', - {'file': {}, 'grobid0': {}, 'f': {}}) - job.hb_table = conn.table('wbgrp-journal-extract-test') - - return job - - -@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) -@responses.activate -def test_mapper_lines(mock_fetch, job): - - responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, - body=REAL_TEI_XML, content_type='text/xml') - - raw = io.BytesIO(b""" -com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 301 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz -eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=761393014319a39f40d32ae3eb3a853f?sequence=1 20170705062202 http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1 application/PDF 200 MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J - - 854156 328850624 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz -com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robots.txt text/plain 200 6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD - - 638 398190140 CITESEERX-CRAWL-2017-06-20-20170705062707827-00049-00058-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705063158203-00053-31209~wbgrp-svc284.us.archive.org~8443.warc.gz -""") - - output = io.BytesIO() - job.sandbox(stdin=raw, stdout=output) - - job.run_mapper() - - # for debugging tests - #print(output.getvalue().decode('utf-8')) - #print(list(job.hb_table.scan())) - - # wayback gets FETCH 1x times - mock_fetch.assert_called_once_with( - "CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz", - 328850624, - 854156) - - # grobid gets POST 1x times - assert len(responses.calls) == 1 - - # HBase - assert job.hb_table.row(b'1') == {} - # HTTP 301 - assert job.hb_table.row(b'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ') == {} - # valid - assert job.hb_table.row(b'sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J') != {} - # text/plain - assert job.hb_table.row(b'sha1:6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD') == {} - - # Saved extraction info - row = job.hb_table.row(b'sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J') - - assert struct.unpack("!q", row[b'file:size'])[0] == len(FAKE_PDF_BYTES) - assert row[b'file:mime'] == b"application/pdf" - assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200 - # TODO: assert row[b'grobid0:quality'] == None - status = json.loads(row[b'grobid0:status'].decode('utf-8')) - assert type(status) == type(dict()) - assert row[b'grobid0:tei_xml'].decode('utf-8') == REAL_TEI_XML - tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8')) - metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8')) - assert tei_json['title'] == metadata['title'] - assert 'body' in tei_json - assert 'body' not in metadata - -def test_parse_cdx_invalid(job): - - print("valid") - raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" - info, status = job.parse_line(raw) - assert status is None - - print("space-prefixed line") - raw = " com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" - info, status = job.parse_line(raw) - assert info is None - assert status['status'] == "invalid" - assert 'prefix' in status['reason'] - - print("commented line") - raw = "#com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" - info, status = job.parse_line(raw) - assert info is None - assert status['status'] == "invalid" - assert 'prefix' in status['reason'] - - print("wrong column count") - raw = "a b c d" - info, status = job.parse_line(raw) - assert info is None - assert status['status'] == "invalid" - assert 'parse' in status['reason'] - - print("missing mimetype") - raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf - 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" - info, status = job.parse_line(raw) - assert info is None - print(status) - assert status['status'] == "invalid" - assert 'parse' in status['reason'] - - print("HTTP status") - raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 501 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" - info, status = job.parse_line(raw) - assert info is None - assert status['status'] == "invalid" - - print("datetime") - raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 501 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" - info, status = job.parse_line(raw) - assert info is None - assert status['status'] == "invalid" - - -def test_parse_cdx_skip(job): - - job.mapper_init() - - print("warc format") - raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" - info, status = job.mapper(None, raw).__next__() - assert info is None - assert status['status'] == "skip" - assert 'WARC' in status['reason'] - - print("mimetype") - raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf text/html 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" - info, status = job.mapper(None, raw).__next__() - assert info is None - assert status['status'] == "skip" - assert 'mimetype' in status['reason'] - - -@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) -@responses.activate -def test_grobid_503(mock_fetch, job): - - status = b'{"status": "done broke due to 503"}' - responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=503, - body=status) - - output = io.BytesIO() - job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) - job.run_mapper() - row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') - status = json.loads(row[b'grobid0:status'].decode('utf-8')) - assert json.loads(row[b'grobid0:status'].decode('utf-8')) == status - - -@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) -@responses.activate -def test_grobid_not_xml(mock_fetch, job): - - payload = b'this is not XML' - responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, - body=payload) - - output = io.BytesIO() - job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) - job.run_mapper() - output = output.getvalue().decode('utf-8') - row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') - assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200 - assert row[b'grobid0:tei_xml'] == payload - assert b'grobid0:tei_json' not in row - assert "XML parse error" in output - - -@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) -@responses.activate -def test_grobid_not_tei(mock_fetch, job): - - payload = b'' - responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, - body=payload) - - output = io.BytesIO() - job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) - job.run_mapper() - output = output.getvalue().decode('utf-8') - row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') - assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200 - assert row[b'grobid0:tei_xml'] == payload - assert b'grobid0:tei_json' not in row - assert "non-TEI content" in output - - -@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) -def test_grobid_invalid_connection(mock_fetch, job): - - status = b'{"status": "done broke"}' - job.options.grobid_uri = 'http://host.invalid:8070/api/processFulltextDocument' - - output = io.BytesIO() - job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) - job.run_mapper() - output = output.getvalue().decode('utf-8') - assert 'error' in output - assert 'GROBID' in output - assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') == {} - - -def test_wayback_failure(job): - - job.options.warc_uri_prefix = 'http://host.invalid/' - - output = io.BytesIO() - job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) - job.run_mapper() - output = output.getvalue().decode('utf-8') - assert 'error' in output - assert 'wayback' in output - assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') == {} - - -@mock.patch('extraction_cdx_grobid.ResourceStore') -def test_wayback_not_found(mock_rs, job): - - # This is... a little convoluded. Basically creating a 404 situation for - # reading a wayback resource. - mock_resource = mock.MagicMock() - mock_resource.get_status.return_value = (404, "Not Found") - mock_rso = mock.MagicMock() - mock_rso.load_resource.return_value = mock_resource - mock_rs.return_value = mock_rso - print(mock_rs().load_resource().get_status()) - - job.options.warc_uri_prefix = 'http://dummy-archive.org/' - - output = io.BytesIO() - job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output) - job.run_mapper() - output = output.getvalue().decode('utf-8') - - print(output) - assert 'error' in output - assert 'not 200' in output - assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') == {} - - -@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) -@responses.activate -def test_mapper_rerun(mock_fetch, job): - - responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, - body=REAL_TEI_XML, content_type='text/xml') - - output1 = io.BytesIO() - job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output1) - job.run_mapper() - output1 = output1.getvalue().decode('utf-8') - - # wayback gets FETCH 1x times - assert mock_fetch.call_count == 1 - # grobid gets POST 1x times - assert len(responses.calls) == 1 - # HBase - assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') != {} - assert 'success' in output1 - - # Run again, same line - output2 = io.BytesIO() - job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output2) - job.run_mapper() - output2 = output2.getvalue().decode('utf-8') - - # wayback still only FETCH 1x times - assert mock_fetch.call_count == 1 - # grobid still only POST 1x times - assert len(responses.calls) == 1 - assert 'existing' in output2 - -@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) -@responses.activate -def test_mapper_previously_backfilled(mock_fetch, job): - - responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, - body=REAL_TEI_XML, content_type='text/xml') - - job.hb_table.put(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ', - {b'f:c': b'{"some": "dict"}', b'file:col': b'bogus'}) - assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') != {} - - output1 = io.BytesIO() - job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output1) - job.run_mapper() - output1 = output1.getvalue().decode('utf-8') - - # wayback gets FETCH 1x times - assert mock_fetch.call_count == 1 - # grobid gets POST 1x times - assert len(responses.calls) == 1 - assert 'success' in output1 diff --git a/python/tests/test_extraction_ungrobided.py b/python/tests/test_extraction_ungrobided.py deleted file mode 100644 index cb46d29..0000000 --- a/python/tests/test_extraction_ungrobided.py +++ /dev/null @@ -1,178 +0,0 @@ - -import io -import json -import mrjob -import pytest -import struct -import responses -import happybase_mock -import wayback.exception -from unittest import mock -from common import parse_ungrobided_line -from extraction_ungrobided import MRExtractUnGrobided - - -FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843) -OK_UNGROBIDED_LINE = b"\t".join(( - b"sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ", - b"""{"c": 1, "d": "2017-07-06T07:54:11", "f": "CITESEERX-CRAWL-2017-06-20-20170706075012840-00388-3671~wbgrp-svc285.us.archive.org~8443.warc.gz", "o": 914718776, "u": "http://www.ibc7.org/article/file_down.php?mode%3Darticle_print%26pid%3D250"}""", - b"application/pdf", - b"""{"c_size": 501, "dt": "20170706075411", "offset": 914718776, "surt": "org,ibc7)/article/file_down.php?mode=article_print&pid=250", "url": "http://www.ibc7.org/article/file_down.php?mode%3Darticle_print%26pid%3D250", "warc": "CITESEERX-CRAWL-2017-06-20-20170706074206206-00379-00388-wbgrp-svc285/CITESEERX-CRAWL-2017-06-20-20170706075012840-00388-3671~wbgrp-svc285.us.archive.org~8443.warc.gz"}""", -)) - -with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f: - REAL_TEI_XML = f.read() - -@pytest.fixture -def job(): - """ - Note: this mock only seems to work with job.run_mapper(), not job.run(); - the later results in a separate instantiation without the mock? - """ - job = MRExtractUnGrobided(['--no-conf', '-']) - - conn = happybase_mock.Connection() - conn.create_table('wbgrp-journal-extract-test', - {'file': {}, 'grobid0': {}, 'f': {}}) - job.hb_table = conn.table('wbgrp-journal-extract-test') - - return job - - -@mock.patch('extraction_ungrobided.MRExtractUnGrobided.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) -@responses.activate -def test_mapper_single_line(mock_fetch, job): - - responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, - body=REAL_TEI_XML, content_type='text/xml') - - raw = io.BytesIO(OK_UNGROBIDED_LINE) - - output = io.BytesIO() - job.sandbox(stdin=raw, stdout=output) - - job.run_mapper() - - # for debugging tests - #print(output.getvalue().decode('utf-8')) - #print(list(job.hb_table.scan())) - - # wayback gets FETCH 1x times - mock_fetch.assert_called_once_with( - "CITESEERX-CRAWL-2017-06-20-20170706074206206-00379-00388-wbgrp-svc285/CITESEERX-CRAWL-2017-06-20-20170706075012840-00388-3671~wbgrp-svc285.us.archive.org~8443.warc.gz", - 914718776, - 501) - - # grobid gets POST 1x times - assert len(responses.calls) == 1 - - # HBase - assert job.hb_table.row(b'1') == {} - - # Saved extraction info - row = job.hb_table.row(b'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ') - - assert struct.unpack("!q", row[b'file:size'])[0] == len(FAKE_PDF_BYTES) - # file:mime should actually not get clobbered by GROBID updater - #assert row[b'file:mime'] == b"application/pdf" - assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200 - # TODO: assert row[b'grobid0:quality'] == None - status = json.loads(row[b'grobid0:status'].decode('utf-8')) - assert type(status) == type(dict()) - assert row[b'grobid0:tei_xml'].decode('utf-8') == REAL_TEI_XML - tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8')) - metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8')) - assert tei_json['title'] == metadata['title'] - assert 'body' in tei_json - assert 'body' not in metadata - -@mock.patch('extraction_ungrobided.MRExtractUnGrobided.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None)) -@responses.activate -def test_mapper_lines(mock_fetch, job): - - responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200, - body=REAL_TEI_XML, content_type='text/xml') - - raw = io.BytesIO(b"""sha1:23PTUXWSNSVE4HS5J7ELDUUG63J2FPCI\t{"c": 1, "d": "2016-06-09T00:27:36", "f": "WIDE-20160609001810-06993.warc.gz", "o": 287880616, "u": "http://www.case-research.eu/sites/default/files/publications/18092393_E-brief_Dabrowski_Monetary_Policy_final_0.pdf"}\tapplication/pdf\t{"c_size": 68262, "dt": "20160609002736", "offset": 287880616, "surt": "eu,case-research)/sites/default/files/publications/18092393_e-brief_dabrowski_monetary_policy_final_0.pdf", "url": "http://www.case-research.eu/sites/default/files/publications/18092393_E-brief_Dabrowski_Monetary_Policy_final_0.pdf", "warc": "WIDE-20160609000312-crawl427/WIDE-20160609001810-06993.warc.gz"} -sha1:23PW2APYHNBPIBRIVNQ6TMKUNY53UL3D\t{"c": 1, "d": "2016-01-07T03:29:03", "f": "MUSEUM-20160107025230-02354.warc.gz", "o": 413484441, "u": "http://www.portlandoregon.gov/fire/article/363695"}\tapplication/pdf\t{"c_size": 44600, "dt": "20160107032903", "offset": 413484441, "surt": "gov,portlandoregon)/fire/article/363695", "url": "http://www.portlandoregon.gov/fire/article/363695", "warc": "MUSEUM-20160107004301-crawl891/MUSEUM-20160107025230-02354.warc.gz"} -sha1:23RJIHUIOYY5747CR6YYCTMACXDCFYTT\t{"c": 1, "d": "2014-06-07T18:00:56", "f": "ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz", "o": 720590380, "u": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=264&artlang=en"}\tapplication/pdf\t{"c_size": 3727, "dt": "20140607180056", "offset": 720590380, "surt": "edu,indiana)/~orafaq/faq/pdf.php?artlang=en&cat=36&id=264", "url": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=264&artlang=en", "warc": "ARCHIVEIT-219-QUARTERLY-20047-00001/ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz"}""") - - - output = io.BytesIO() - job.sandbox(stdin=raw, stdout=output) - - job.run_mapper() - - # for debugging tests - #print(output.getvalue().decode('utf-8')) - #print(list(job.hb_table.scan())) - - # grobid gets POST 3x times - assert len(responses.calls) == 3 - - # wayback gets FETCH 3x times - mock_fetch.assert_has_calls(( - mock.call("WIDE-20160609000312-crawl427/WIDE-20160609001810-06993.warc.gz", 287880616, 68262), - mock.call("MUSEUM-20160107004301-crawl891/MUSEUM-20160107025230-02354.warc.gz", 413484441, 44600), - mock.call("ARCHIVEIT-219-QUARTERLY-20047-00001/ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz", 720590380, 3727), - )) - - # Saved extraction info - assert job.hb_table.row(b'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ') == {} - assert job.hb_table.row(b'sha1:23PTUXWSNSVE4HS5J7ELDUUG63J2FPCI') != {} - assert job.hb_table.row(b'sha1:23PW2APYHNBPIBRIVNQ6TMKUNY53UL3D') != {} - assert job.hb_table.row(b'sha1:23RJIHUIOYY5747CR6YYCTMACXDCFYTT') != {} - - row = job.hb_table.row(b'sha1:23RJIHUIOYY5747CR6YYCTMACXDCFYTT') - assert struct.unpack("!q", row[b'file:size'])[0] == len(FAKE_PDF_BYTES) - # file:mime should actually not get clobbered by GROBID updater - #assert row[b'file:mime'] == b"application/pdf" - assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200 - status = json.loads(row[b'grobid0:status'].decode('utf-8')) - assert type(status) == type(dict()) - assert row[b'grobid0:tei_xml'].decode('utf-8') == REAL_TEI_XML - tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8')) - metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8')) - assert tei_json['title'] == metadata['title'] - assert 'body' in tei_json - assert 'body' not in metadata - -def test_parse_ungrobided_invalid(job): - - print("space-prefixed line") - raw = " com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" - info, status = job.parse_ungrobided_line(raw) - assert info is None - assert status['status'] == "invalid" - assert 'prefix' in status['reason'] - - print("commented line") - raw = "#com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" - info, status = job.parse_ungrobided_line(raw) - assert info is None - assert status['status'] == "invalid" - assert 'prefix' in status['reason'] - - print("wrong column count") - raw = "a b c d e" - info, status = job.parse_ungrobided_line(raw) - assert info is None - assert status['status'] == "invalid" - assert 'parse' in status['reason'] - - print("CDX line, somehow") - raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf - 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz" - info, status = job.parse_ungrobided_line(raw) - assert info is None - print(status) - assert status['status'] == "invalid" - assert 'parse' in status['reason'] - -def test_parse_ungrobided_valid(): - - parsed = parse_ungrobided_line(OK_UNGROBIDED_LINE.decode('utf-8')) - assert parsed['key'] == "sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ" - assert parsed['f:c']['u'] == "http://www.ibc7.org/article/file_down.php?mode%3Darticle_print%26pid%3D250" - assert parsed['file:mime'] == "application/pdf" - assert parsed['file:cdx']['c_size'] == 501 - assert parsed['file:cdx']['dt'] == "20170706075411" -- cgit v1.2.3