aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/README.md104
-rwxr-xr-xpython/backfill_hbase_from_cdx.py88
-rwxr-xr-xpython/extraction_cdx_grobid.py299
-rwxr-xr-xpython/extraction_ungrobided.py292
-rwxr-xr-xpython/kafka_grobid_hbase.py200
-rw-r--r--python/mrjob.conf16
-rw-r--r--python/tests/files/example_ungrobided.tsv20
-rw-r--r--python/tests/test_backfill_hbase_from_cdx.py74
-rw-r--r--python/tests/test_extraction_cdx_grobid.py319
-rw-r--r--python/tests/test_extraction_ungrobided.py178
10 files changed, 0 insertions, 1590 deletions
diff --git a/python/README.md b/python/README.md
deleted file mode 100644
index 198c949..0000000
--- a/python/README.md
+++ /dev/null
@@ -1,104 +0,0 @@
-
-Hadoop streaming map/reduce jobs written in python using the mrjob library.
-
-## Development and Testing
-
-System dependencies on Linux (ubuntu/debian):
-
- sudo apt install -y python3-dev python3-pip python3-wheel libjpeg-dev build-essential
- pip3 install --user pipenv
-
-On macOS (using Homebrew):
-
- brew install libjpeg pipenv
-
-You probably need `~/.local/bin` on your `$PATH`.
-
-Fetch all python dependencies with:
-
- pipenv install --dev
-
-Run the tests with:
-
- pipenv run pytest
-
-Check test coverage with:
-
- pytest --cov --cov-report html
- # open ./htmlcov/index.html in a browser
-
-## Troubleshooting
-
-If you get pipenv errors like:
-
- AttributeError: '_NamespacePath' object has no attribute 'sort'
-
- ----------------------------------------
-
- Command "python setup.py egg_info" failed with error code 1 in /1/tmp/pip-install-h7lb6tqz/proto-google-cloud-datastore-v1/
-
- ☤ ▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉ 0/8 — 00:00:03
- bnewbold@bnewbold-dev$
- bnewbold@bnewbold-dev$ pipenv install --deploy --dev
- Installing dependencies from Pipfile.lock (e82980)…
- An error occurred while installing proto-google-cloud-logging-v2==0.91.3! Will try again.
- An error occurred while installing gapic-google-cloud-error-reporting-v1beta1==0.15.3! Will try again.
- An error occurred while installing gapic-google-cloud-datastore-v1==0.15.3! Will try again.
- An error occurred while installing proto-google-cloud-datastore-v1==0.90.4! Will try again.
-
-Then something has gone horribly wrong with your pip/pipenv/python setup. Don't
-have a good workaround yet.
-
-## Running Python Jobs on Hadoop
-
-The `../please` script automates these steps; you should use that instead.
-
-When running python streaming jobs on the actual hadoop cluster, we need to
-bundle along our python dependencies in a virtual env tarball. Building this
-tarball can be done like:
-
- export PIPENV_VENV_IN_PROJECT=1
- pipenv install --deploy
- tar -czf venv-current.tar.gz -C .venv .
-
-### Extraction Task
-
-An example actually connecting to HBase from a local machine, with thrift
-running on a devbox and GROBID running on a dedicated machine:
-
- ./extraction_cdx_grobid.py \
- --hbase-table wbgrp-journal-extract-0-qa \
- --hbase-host wbgrp-svc263.us.archive.org \
- --grobid-uri http://wbgrp-svc096.us.archive.org:8070 \
- tests/files/example.cdx
-
-Running from the cluster (once a ./venv-current.tar.gz tarball exists):
-
- ./extraction_cdx_grobid.py \
- --hbase-table wbgrp-journal-extract-0-qa \
- --hbase-host wbgrp-svc263.us.archive.org \
- --grobid-uri http://wbgrp-svc096.us.archive.org:8070 \
- -r hadoop \
- -c mrjob.conf \
- --archive venv-current.tar.gz#venv \
- hdfs:///user/bnewbold/journal_crawl_cdx/citeseerx_crawl_2017.cdx
-
-### Backfill Task
-
-An example actually connecting to HBase from a local machine, with thrift
-running on a devbox:
-
- ./backfill_hbase_from_cdx.py \
- --hbase-table wbgrp-journal-extract-0-qa \
- --hbase-host wbgrp-svc263.us.archive.org \
- tests/files/example.cdx
-
-Running from the cluster (once a ./venv-current.tar.gz tarball exists):
-
- ./backfill_hbase_from_cdx.py \
- --hbase-host wbgrp-svc263.us.archive.org \
- --hbase-table wbgrp-journal-extract-0-qa \
- -r hadoop \
- -c mrjob.conf \
- --archive venv-current.tar.gz#venv \
- hdfs:///user/bnewbold/journal_crawl_cdx/citeseerx_crawl_2017.cdx
diff --git a/python/backfill_hbase_from_cdx.py b/python/backfill_hbase_from_cdx.py
deleted file mode 100755
index 6b2ec0b..0000000
--- a/python/backfill_hbase_from_cdx.py
+++ /dev/null
@@ -1,88 +0,0 @@
-#!/usr/bin/env python3
-"""
-Streaming Hadoop script to import CDX metadata into the HBase fulltext table,
-primarily for URL-agnostic crawl de-duplication. Takes only "fulltext" file
-formats.
-
-Requires:
-- happybase
-- mrjob
-"""
-
-import json
-import happybase
-import mrjob
-from mrjob.job import MRJob
-from common import parse_cdx_line
-
-
-class MRCDXBackfillHBase(MRJob):
-
- # CDX lines in; JSON status out
- INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
- OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol
-
- def configure_args(self):
- super(MRCDXBackfillHBase, self).configure_args()
-
- self.add_passthru_arg('--hbase-table',
- type=str,
- default='wbgrp-journal-extract-0-qa',
- help='HBase table to backfill into (must exist)')
- self.add_passthru_arg('--hbase-host',
- type=str,
- default='localhost',
- help='HBase thrift API host to connect to')
-
- def __init__(self, *args, **kwargs):
- super(MRCDXBackfillHBase, self).__init__(*args, **kwargs)
- self.mime_filter = ['application/pdf']
- self.hb_table = None
-
- def mapper_init(self):
-
- if self.hb_table:
- return
-
- try:
- host = self.options.hbase_host
- # TODO: make these configs accessible from... mrconf.cfg?
- hb_conn = happybase.Connection(host=host, transport="framed",
- protocol="compact")
- except Exception:
- raise Exception("Couldn't connect to HBase using host: {}".format(host))
- self.hb_table = hb_conn.table(self.options.hbase_table)
-
- def mapper(self, _, raw_cdx):
-
- self.increment_counter('lines', 'total')
-
- if (raw_cdx.startswith(' ') or raw_cdx.startswith('filedesc') or
- raw_cdx.startswith('#')):
- self.increment_counter('lines', 'invalid')
- yield _, dict(status="invalid", reason="line prefix")
- return
-
- info = parse_cdx_line(raw_cdx)
- if info is None:
- self.increment_counter('lines', 'invalid')
- yield _, dict(status="invalid")
- return
-
- if info['file:mime'] not in self.mime_filter:
- self.increment_counter('lines', 'skip')
- yield _, dict(status="skip", reason="unwanted mimetype")
- return
-
- key = info.pop('key')
- info['f:c'] = json.dumps(info['f:c'], sort_keys=True, indent=None)
- info['file:cdx'] = json.dumps(info['file:cdx'],
- sort_keys=True, indent=None)
-
- self.hb_table.put(key, info)
- self.increment_counter('lines', 'success')
-
- yield _, dict(status="success")
-
-if __name__ == '__main__': # pragma: no cover
- MRCDXBackfillHBase.run()
diff --git a/python/extraction_cdx_grobid.py b/python/extraction_cdx_grobid.py
deleted file mode 100755
index 88580e1..0000000
--- a/python/extraction_cdx_grobid.py
+++ /dev/null
@@ -1,299 +0,0 @@
-#!/usr/bin/env python3
-"""
-Streaming Hadoop script to import extract metadata and body from fulltext (eg,
-PDF) files using GROBID. Input is a CDX file; results primarly go to HBase,
-with status written to configurable output stream.
-
-Fulltext files are loaded directly from WARC files in petabox, instead of going
-through the wayback replay.
-
-Requires:
-- happybase
-- mrjob
-- wayback/GWB libraries
-"""
-
-# XXX: some broken MRO thing going on in here due to python3 object wrangling
-# in `wayback` library. Means we can't run pylint.
-# pylint: skip-file
-
-import os
-import xml
-import json
-import raven
-import struct
-import requests
-import happybase
-import mrjob
-from mrjob.job import MRJob
-import wayback.exception
-from http.client import IncompleteRead
-from wayback.resourcestore import ResourceStore
-from gwb.loader import CDXLoaderFactory
-
-from common import parse_cdx_line
-from grobid2json import teixml2json
-
-# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable
-sentry_client = raven.Client()
-
-# Specific poison-pill rows we should skip
-KEY_DENYLIST = (
- 'sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT', # "failed to guess ARC header format"
-)
-
-class MRExtractCdxGrobid(MRJob):
-
- # CDX lines in; JSON status out
- #HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.NLineInputFormat'
- #INPUT_PROTOCOL = mrjob.protocol.RawProtocol
- INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
- OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol
-
- def configure_args(self):
- super(MRExtractCdxGrobid, self).configure_args()
-
- self.add_passthru_arg('--hbase-table',
- type=str,
- default='wbgrp-journal-extract-0-qa',
- help='HBase table to backfill into (must exist)')
- self.add_passthru_arg('--hbase-host',
- type=str,
- default='localhost',
- help='HBase thrift API host to connect to')
- self.add_passthru_arg('--grobid-uri',
- type=str,
- default='http://localhost:8070',
- help='URI of GROBID API Server')
- self.add_passthru_arg('--warc-uri-prefix',
- type=str,
- default='https://archive.org/serve/',
- help='URI where WARCs can be found')
- self.add_passthru_arg('--force-existing',
- action="store_true",
- help='Re-processes (with GROBID) existing lines')
-
- def __init__(self, *args, **kwargs):
- super(MRExtractCdxGrobid, self).__init__(*args, **kwargs)
- self.hb_table = None
- self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET'))
- self.mime_filter = ['application/pdf']
- self.rstore = None
-
- def grobid_process_fulltext(self, content):
- r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument",
- files={'input': content})
- return r
-
- def mapper_init(self):
-
- if self.hb_table:
- return
-
- sentry_client.tags_context(dict(hbase_table=self.options.hbase_table))
- try:
- host = self.options.hbase_host
- # TODO: make these configs accessible from... mrconf.cfg?
- hb_conn = happybase.Connection(host=host, transport="framed",
- protocol="compact")
- except Exception:
- raise Exception("Couldn't connect to HBase using host: {}".format(host))
- self.hb_table = hb_conn.table(self.options.hbase_table)
-
- def parse_line(self, raw_cdx):
-
- if (raw_cdx.startswith(' ') or raw_cdx.startswith('filedesc') or
- raw_cdx.startswith('#')):
- return None, dict(status="invalid", reason="line prefix")
-
- info = parse_cdx_line(raw_cdx)
- if info is None:
- return None, dict(status="invalid", reason="CDX parse")
-
- if info['file:mime'] not in self.mime_filter:
- return None, dict(status="skip", reason="mimetype")
-
- # If warc is not item/file.(w)arc.gz form, skip it
- if len(info['file:cdx']['warc'].split('/')) != 2:
- return None, dict(status="skip", reason="WARC path not petabox item/file")
-
- return info, None
-
- def fetch_warc_content(self, warc_path, offset, c_size):
- warc_uri = self.options.warc_uri_prefix + warc_path
- if not self.rstore:
- self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory(
- webdata_secret=self.petabox_webdata_secret,
- download_base_url=self.options.warc_uri_prefix))
- try:
- gwb_record = self.rstore.load_resource(warc_uri, offset, c_size)
- except wayback.exception.ResourceUnavailable:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (ResourceUnavailable)")
- except ValueError as ve:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (ValueError: {})".format(ve))
- except EOFError as eofe:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
- except TypeError as te:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te))
- # Note: could consider a generic "except Exception" here, as we get so
- # many petabox errors. Do want jobs to fail loud and clear when the
- # whole cluster is down though.
-
- if gwb_record.get_status()[0] != 200:
- return None, dict(status="error",
- reason="archived HTTP response (WARC) was not 200",
- warc_status=gwb_record.get_status()[0])
-
- try:
- raw_content = gwb_record.open_raw_content().read()
- except IncompleteRead as ire:
- return None, dict(status="error",
- reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire))
- return raw_content, None
-
- def extract(self, info):
-
- # Fetch data from WARCs in petabox
- original_content, status = self.fetch_warc_content(
- info['file:cdx']['warc'],
- info['file:cdx']['offset'],
- info['file:cdx']['c_size'])
- if status:
- return None, status
-
- info['file:size'] = len(original_content)
-
- # Submit to GROBID
- try:
- grobid_response = self.grobid_process_fulltext(original_content)
- except requests.exceptions.ConnectionError:
- return None, dict(status="error", reason="connection to GROBID worker")
-
- info['grobid0:status_code'] = grobid_response.status_code
-
- # 4 MByte XML size limit; don't record GROBID status on this path
- if len(grobid_response.content) > 4000000:
- info['grobid0:status'] = {'status': 'oversize'}
- return info, dict(status="oversize", reason="TEI response was too large")
-
- if grobid_response.status_code != 200:
- # response.text is .content decoded as utf-8
- info['grobid0:status'] = dict(status='error', description=grobid_response.text)
- return info, dict(status="error", reason="non-200 GROBID HTTP status",
- extra=grobid_response.text)
-
- info['grobid0:status'] = {'status': 'partial'}
- info['grobid0:tei_xml'] = grobid_response.content
-
- # Convert TEI XML to JSON
- try:
- info['grobid0:tei_json'] = teixml2json(info['grobid0:tei_xml'], encumbered=True)
- except xml.etree.ElementTree.ParseError:
- info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML parse error")
- return info, info['grobid0:status']
- except ValueError:
- info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML non-TEI content")
- return info, info['grobid0:status']
-
- tei_metadata = info['grobid0:tei_json'].copy()
- for k in ('body', 'annex'):
- # Remove fulltext (copywritted) content
- tei_metadata.pop(k, None)
- info['grobid0:metadata'] = tei_metadata
-
- # Determine extraction "quality"
- # TODO:
-
- info['grobid0:quality'] = None
- info['grobid0:status'] = {'status': 'success'}
-
- return info, None
-
- @sentry_client.capture_exceptions
- def mapper(self, _, raw_cdx):
- """
- 1. parse CDX line
- 2. check what is already in hbase
- 3. fetch data from wayback
- 4. submit to GROBID
- 5. convert GROBID response to JSON (and metadata)
- 6. determine "quality"
- 7. push results to hbase
- """
-
- self.increment_counter('lines', 'total')
-
- # Parse line and filter down
- info, status = self.parse_line(raw_cdx)
- if info is None:
- self.increment_counter('lines', status['status'])
- yield _, status
- return
- key = info['key']
- if key in KEY_DENYLIST:
- self.increment_counter('lines', 'denylist')
- yield _, dict(status='denylist', key=key)
- return
-
- # Note: this may not get "cleared" correctly
- sentry_client.extra_context(dict(row_key=key))
-
- # Check if we've already processed this line
- oldrow = self.hb_table.row(key,
- columns=[b'f:c', b'file', b'grobid0:status_code'])
- if (oldrow.get(b'grobid0:status_code', None) != None
- and not self.options.force_existing):
- # This file has already been processed; skip it
- self.increment_counter('lines', 'existing')
- yield _, dict(status="existing", key=key)
- return
-
- # Do the extraction
- info, status = self.extract(info)
- if info is None:
- self.increment_counter('lines', status['status'])
- status['key'] = key
- yield _, status
- return
- extraction_status = status
-
- # Decide what to bother inserting back into HBase
- # Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx')
- grobid_status_code = info.get('grobid0:status_code', None)
- for k in list(info.keys()):
- if k.encode('utf-8') in oldrow:
- info.pop(k)
-
- # Convert fields to binary
- for k in list(info.keys()):
- if info[k] is None:
- info.pop(k)
- elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',
- 'grobid0:metadata'):
- assert type(info[k]) == dict
- info[k] = json.dumps(info[k], sort_keys=True, indent=None)
- elif k in ('file:size', 'grobid0:status_code'):
- # encode as int64 in network byte order
- if info[k] != {} and info[k] != None:
- info[k] = struct.pack('!q', info[k])
-
- key = info.pop('key')
- self.hb_table.put(key, info)
- self.increment_counter('lines', 'success')
-
- if extraction_status is not None:
- yield _, dict(status="partial", key=key,
- grobid_status_code=grobid_status_code,
- reason=extraction_status['reason'])
- else:
- yield _, dict(status="success",
- grobid_status_code=grobid_status_code, key=key,
- extra=extraction_status)
-
-
-if __name__ == '__main__': # pragma: no cover
- MRExtractCdxGrobid.run()
diff --git a/python/extraction_ungrobided.py b/python/extraction_ungrobided.py
deleted file mode 100755
index 225e46f..0000000
--- a/python/extraction_ungrobided.py
+++ /dev/null
@@ -1,292 +0,0 @@
-#!/usr/bin/env python3
-"""
-Variant of extraction_cdx_grobid which takes a partial metadata list as input
-instead of CDX.
-
-This task list is dumped by another Hadoop job which scans over the HBase table
-quickly, which allows this job to skip a (relatively) expensive HBase read
-per-row.
-
-Requires:
-- happybase
-- mrjob
-- wayback/GWB libraries
-"""
-
-# XXX: some broken MRO thing going on in here due to python3 object wrangling
-# in `wayback` library. Means we can't run pylint.
-# pylint: skip-file
-
-import os
-import xml
-import json
-import raven
-import struct
-import requests
-import happybase
-import mrjob
-from mrjob.job import MRJob
-import wayback.exception
-from http.client import IncompleteRead
-from wayback.resourcestore import ResourceStore
-from gwb.loader import CDXLoaderFactory
-
-from common import parse_ungrobided_line
-from grobid2json import teixml2json
-
-# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable
-sentry_client = raven.Client()
-
-# Specific poison-pill rows we should skip
-KEY_DENYLIST = (
- 'sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT', # "failed to guess ARC header format"
-)
-
-class MRExtractUnGrobided(MRJob):
-
- # "ungrobided" TSV lines in; JSON status out
- #HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.NLineInputFormat'
- #INPUT_PROTOCOL = mrjob.protocol.RawProtocol
- INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
- OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol
-
- def configure_args(self):
- super(MRExtractUnGrobided, self).configure_args()
-
- self.add_passthru_arg('--hbase-table',
- type=str,
- default='wbgrp-journal-extract-0-qa',
- help='HBase table to backfill into (must exist)')
- self.add_passthru_arg('--hbase-host',
- type=str,
- default='localhost',
- help='HBase thrift API host to connect to')
- self.add_passthru_arg('--grobid-uri',
- type=str,
- default='http://localhost:8070',
- help='URI of GROBID API Server')
- self.add_passthru_arg('--warc-uri-prefix',
- type=str,
- default='https://archive.org/serve/',
- help='URI where WARCs can be found')
-
- def __init__(self, *args, **kwargs):
- super(MRExtractUnGrobided, self).__init__(*args, **kwargs)
- self.hb_table = None
- self.petabox_webdata_secret = kwargs.get('petabox_webdata_secret', os.environ.get('PETABOX_WEBDATA_SECRET'))
- self.mime_filter = ['application/pdf']
- self.rstore = None
-
- def grobid_process_fulltext(self, content):
- r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument",
- files={'input': content})
- return r
-
- def mapper_init(self):
-
- if self.hb_table:
- return
-
- sentry_client.tags_context(dict(hbase_table=self.options.hbase_table))
- try:
- host = self.options.hbase_host
- # TODO: make these configs accessible from... mrconf.cfg?
- hb_conn = happybase.Connection(host=host, transport="framed",
- protocol="compact")
- except Exception:
- raise Exception("Couldn't connect to HBase using host: {}".format(host))
- self.hb_table = hb_conn.table(self.options.hbase_table)
-
- def parse_ungrobided_line(self, raw_line):
- """Line should be TSV and have non-null fields:
-
- - key (string)
- - f:c (string, json)
- - file:mime (string)
- - file:cdx (string, json)
- """
-
- if (raw_line.startswith(' ') or raw_line.startswith('#')):
- return None, dict(status="invalid", reason="line prefix", input=raw_line)
-
- info = parse_ungrobided_line(raw_line)
- if info is None:
- return None, dict(status="invalid", reason="ungrobided parse")
-
- if info['file:mime'] not in self.mime_filter:
- return None, dict(status="skip", reason="mimetype", mimetype=info['file:mime'])
-
- # If warc is not item/file.(w)arc.gz form, skip it
- if len(info['file:cdx']['warc'].split('/')) != 2:
- return None, dict(status="skip", reason="WARC path not petabox item/file", path=info['file:cdx']['warc'])
-
- return info, None
-
- def fetch_warc_content(self, warc_path, offset, c_size):
- warc_uri = self.options.warc_uri_prefix + warc_path
- if not self.rstore:
- self.rstore = ResourceStore(loaderfactory=CDXLoaderFactory(
- webdata_secret=self.petabox_webdata_secret,
- download_base_url=self.options.warc_uri_prefix))
- try:
- gwb_record = self.rstore.load_resource(warc_uri, offset, c_size)
- except wayback.exception.ResourceUnavailable:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (ResourceUnavailable)")
- except ValueError as ve:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (ValueError: {})".format(ve))
- except EOFError as eofe:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (EOFError: {})".format(eofe))
- except TypeError as te:
- return None, dict(status="error",
- reason="failed to load file contents from wayback/petabox (TypeError: {}; likely a bug in wayback python code)".format(te))
- # Note: could consider a generic "except Exception" here, as we get so
- # many petabox errors. Do want jobs to fail loud and clear when the
- # whole cluster is down though.
-
- if gwb_record.get_status()[0] != 200:
- return None, dict(status="error",
- reason="archived HTTP response (WARC) was not 200",
- warc_status=gwb_record.get_status()[0])
-
- try:
- raw_content = gwb_record.open_raw_content().read()
- except IncompleteRead as ire:
- return None, dict(status="error",
- reason="failed to read actual file contents from wayback/petabox (IncompleteRead: {})".format(ire))
- return raw_content, None
-
- def extract(self, info):
-
- # Fetch data from WARCs in petabox
- original_content, status = self.fetch_warc_content(
- info['file:cdx']['warc'],
- info['file:cdx']['offset'],
- info['file:cdx']['c_size'])
- if status:
- return None, status
-
- info['file:size'] = len(original_content)
-
- # Submit to GROBID
- try:
- grobid_response = self.grobid_process_fulltext(original_content)
- except requests.exceptions.ConnectionError:
- return None, dict(status="error", reason="connection to GROBID worker")
-
- info['grobid0:status_code'] = grobid_response.status_code
-
- # 4 MByte XML size limit; don't record GROBID status on this path
- if len(grobid_response.content) > 4000000:
- info['grobid0:status'] = {'status': 'oversize'}
- return info, dict(status="oversize", reason="TEI response was too large")
-
- if grobid_response.status_code != 200:
- # response.text is .content decoded as utf-8
- info['grobid0:status'] = dict(status='error', description=grobid_response.text)
- return info, dict(status="error", reason="non-200 GROBID HTTP status",
- extra=grobid_response.text)
-
- info['grobid0:status'] = {'status': 'partial'}
- info['grobid0:tei_xml'] = grobid_response.content
-
- # Convert TEI XML to JSON
- try:
- info['grobid0:tei_json'] = teixml2json(info['grobid0:tei_xml'], encumbered=True)
- except xml.etree.ElementTree.ParseError:
- info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML parse error")
- return info, info['grobid0:status']
- except ValueError:
- info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML non-TEI content")
- return info, info['grobid0:status']
-
- tei_metadata = info['grobid0:tei_json'].copy()
- for k in ('body', 'annex'):
- # Remove fulltext (copywritted) content
- tei_metadata.pop(k, None)
- info['grobid0:metadata'] = tei_metadata
-
- # Determine extraction "quality"
- # TODO:
-
- info['grobid0:quality'] = None
- info['grobid0:status'] = {'status': 'success'}
-
- return info, None
-
- @sentry_client.capture_exceptions
- def mapper(self, _, raw_line):
- """
- 1. parse filtered line
- 2. fetch data from wayback
- 3. submit to GROBID
- 4. convert GROBID response to JSON (and metadata)
- 6. determine "quality"
- 6. push results to hbase
- """
-
- self.increment_counter('lines', 'total')
-
- # Parse line and filter down
- info, status = self.parse_ungrobided_line(raw_line)
- if info is None:
- self.increment_counter('lines', status['status'])
- yield _, status
- return
- key = info['key']
- if key in KEY_DENYLIST:
- self.increment_counter('lines', 'denylist')
- yield _, dict(status='denylist', key=key)
- return
-
- # Note: this may not get "cleared" correctly
- sentry_client.extra_context(dict(row_key=key))
-
- # Do the extraction
- info, status = self.extract(info)
- if info is None:
- self.increment_counter('lines', status['status'])
- status['key'] = key
- yield _, status
- return
- extraction_status = status
-
- # Decide what to bother inserting back into HBase
- # Basically, don't overwrite backfill fields.
- grobid_status_code = info.get('grobid0:status_code', None)
- for k in list(info.keys()):
- if k in ('f:c', 'file:mime', 'file:cdx'):
- info.pop(k)
-
- # Convert fields to binary
- for k in list(info.keys()):
- if info[k] is None:
- info.pop(k)
- # NOTE: we're not actually sending these f:*, file:* keys...
- elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',
- 'grobid0:metadata'):
- assert type(info[k]) == dict
- info[k] = json.dumps(info[k], sort_keys=True, indent=None)
- elif k in ('file:size', 'grobid0:status_code'):
- # encode as int64 in network byte order
- if info[k] != {} and info[k] != None:
- info[k] = struct.pack('!q', info[k])
-
- key = info.pop('key')
- self.hb_table.put(key, info)
- self.increment_counter('lines', 'success')
-
- if extraction_status is not None:
- yield _, dict(status="partial", key=key,
- grobid_status_code=grobid_status_code,
- reason=extraction_status['reason'])
- else:
- yield _, dict(status="success",
- grobid_status_code=grobid_status_code, key=key,
- extra=extraction_status)
-
-
-if __name__ == '__main__': # pragma: no cover
- MRExtractUnGrobided.run()
diff --git a/python/kafka_grobid_hbase.py b/python/kafka_grobid_hbase.py
deleted file mode 100755
index b52c386..0000000
--- a/python/kafka_grobid_hbase.py
+++ /dev/null
@@ -1,200 +0,0 @@
-#!/usr/bin/env python3
-"""
-Kafka worker that consumes GROBID output from Kafka and pushes into HBase.
-
-Based on the ungrobided Hadoop job code.
-
-TODO: binary conversion in 'grobided' topic? shouldn't be, do that here, as well as all TEI extraction/parsing
-
-Requires:
-- requests
-- pykafka
-"""
-
-# XXX: some broken MRO thing going on in here due to python3 object wrangling
-# in `wayback` library. Means we can't run pylint.
-# pylint: skip-file
-
-import sys
-import xml
-import json
-import raven
-import struct
-import requests
-import argparse
-import happybase
-import pykafka
-
-from common import parse_ungrobided_line
-from grobid2json import teixml2json
-
-# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable
-sentry_client = raven.Client()
-
-# Specific poison-pill rows we should skip
-KEY_DENYLIST = (
- 'sha1:DLCCSMMVTCCIR6LRXHEQLZ4PWO6NG2YT', # "failed to guess ARC header format"
-)
-
-class KafkaGrobidHbaseWorker:
-
- def __init__(self, kafka_hosts, consume_topic, **kwargs):
- self.consume_topic = consume_topic
- self.consumer_group = kwargs.get('consumer_group', 'grobid-hbase-insert2')
- self.kafka_hosts = kafka_hosts or 'localhost:9092'
- self.hbase_host = kwargs['hbase_host']
- self.hbase_table_name = kwargs['hbase_table']
- self.hb_table = None # connection initialized in run()
-
- def convert_tei(self, info):
-
- # Convert TEI XML to JSON
- try:
- info['grobid0:tei_json'] = teixml2json(info['grobid0:tei_xml'], encumbered=True)
- except xml.etree.ElementTree.ParseError:
- info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML parse error")
- return info, info['grobid0:status']
- except ValueError:
- info['grobid0:status'] = dict(status="fail", reason="GROBID 200 XML non-TEI content")
- return info, info['grobid0:status']
-
- tei_metadata = info['grobid0:tei_json'].copy()
- for k in ('body', 'annex'):
- # Remove fulltext (copywritted) content
- tei_metadata.pop(k, None)
- info['grobid0:metadata'] = tei_metadata
- return info, None
-
- def do_work(self, raw_line):
- """
- 1. parse info JSON (with XML inside)
- 2. do XML -> JSON conversions
- 3. push to HBase
-
- Returns: ???
- """
-
- # Parse line and filter down
- info = json.loads(raw_line)
- key = info['key']
- if key in KEY_DENYLIST:
- #self.increment_counter('lines', 'denylist')
- return None, dict(status='denylist', key=key)
-
- # Note: this may not get "cleared" correctly
- sentry_client.extra_context(dict(row_key=key))
- print("inserting line to HBase: {}".format(key))
-
- if info.get('grobid0:tei_xml'):
- # Need to decode 'str' back in to 'bytes' (from JSON serialization)
- info['grobid0:tei_xml'] = info['grobid0:tei_xml'].encode('utf-8')
-
- if info.get('grobid0:status') == 200 and info.get('grobid0:tei_xml'):
- info, status = self.convert_tei(info)
-
- # Decide what to bother inserting back into HBase
- # Basically, don't overwrite backfill fields.
- grobid_status_code = info.get('grobid0:status_code', None)
- for k in list(info.keys()):
- if k in ('f:c', 'file:mime', 'file:cdx'):
- info.pop(k)
-
- # Convert fields to binary
- for k in list(info.keys()):
- if info[k] is None:
- info.pop(k)
- # NOTE: we're not actually sending these f:*, file:* keys...
- elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',
- 'grobid0:metadata'):
- assert type(info[k]) == dict
- info[k] = json.dumps(info[k], sort_keys=True, indent=None)
- elif k in ('file:size', 'grobid0:status_code'):
- # encode as int64 in network byte order
- if info[k] != {} and info[k] != None:
- info[k] = struct.pack('!q', info[k])
-
- key = info.pop('key')
- self.hb_table.put(key, info)
- #self.increment_counter('lines', 'success')
-
- return info, dict(status="success",
- grobid_status_code=grobid_status_code, key=key)
-
- def run(self):
-
- # 1. start consumer (in managed/balanced fashion, with consumer group)
- # 2. for each thingie, do the work; if success publish to kafka; either
- # way... print? log?
- # 3. repeat!
-
- print("Starting grobid-hbase-worker...")
- try:
- host = self.hbase_host
- hb_conn = happybase.Connection(host=host, transport="framed",
- protocol="compact")
- except Exception:
- raise Exception("Couldn't connect to HBase using host: {}".format(host))
- self.hb_table = hb_conn.table(self.hbase_table_name)
- print("HBase inserting into {}".format(self.hbase_table_name))
-
- kafka = pykafka.KafkaClient(hosts=self.kafka_hosts, broker_version="2.0.0")
- consume_topic = kafka.topics[self.consume_topic]
-
- sequential_failures = 0
- consumer = consume_topic.get_balanced_consumer(
- consumer_group=self.consumer_group,
- managed=True,
- auto_commit_enable=True,
- # needed to avoid MessageSet decode errors
- fetch_message_max_bytes=4*1024*1024,
- # LATEST because best to miss processing than waste time re-process
- auto_offset_reset=pykafka.common.OffsetType.LATEST,
- compacted_topic=True)
- print("Kafka consuming {} in group {}".format(
- self.consume_topic,
- self.consumer_group))
-
- for msg in consumer:
- #print("got a line! ")
- grobid_output, status = self.do_work(msg.value.decode('utf-8'))
- if grobid_output:
- sequential_failures = 0
- else:
- sys.stderr.write("Failed to process GROBID extraction output: {}\n".format(status))
- sequential_failures += 1
- if sequential_failures > 20:
- sys.stderr.write("too many failures in a row, bailing out\n")
- sys.exit(-1)
-
-
-@sentry_client.capture_exceptions
-def main():
-
- parser = argparse.ArgumentParser()
- parser.add_argument('--kafka-hosts',
- default="localhost:9092",
- help="list of Kafka brokers (host/port) to use")
- parser.add_argument('--kafka-env',
- default="qa",
- help="eg, 'qa' or 'prod'")
- parser.add_argument('--consume-topic',
- default=None,
- help="Kafka topic to consume from")
- parser.add_argument('--hbase-table',
- type=str,
- default='wbgrp-journal-extract-0-qa',
- help='HBase table to backfill into (must exist)')
- parser.add_argument('--hbase-host',
- type=str,
- default='localhost',
- help='HBase thrift API host to connect to')
- args = parser.parse_args()
-
- if args.consume_topic is None:
- args.consume_topic = "sandcrawler-{}.grobid-output".format(args.kafka_env)
-
- worker = KafkaGrobidHbaseWorker(**args.__dict__)
- worker.run()
-
-if __name__ == '__main__': # pragma: no cover
- main()
diff --git a/python/mrjob.conf b/python/mrjob.conf
deleted file mode 100644
index 6f36196..0000000
--- a/python/mrjob.conf
+++ /dev/null
@@ -1,16 +0,0 @@
-runners:
- local:
- upload_files:
- - common.py
- - grobid2json.py
- setup:
- - export PYTHONPATH=$PYTHONPATH:venv/lib/python3.5/site-packages/
- hadoop:
- no_output: true
- upload_files:
- - common.py
- - grobid2json.py
- setup:
- - export PYTHONPATH=$PYTHONPATH:venv/lib/python3.5/site-packages/
- cmdenv:
- SENTRY_DSN: https://6ab6ad080d034280b863f294e07cc5c6:414ebf0b68634f669d2dc00d7c935699@books-sentry.us.archive.org/9
diff --git a/python/tests/files/example_ungrobided.tsv b/python/tests/files/example_ungrobided.tsv
deleted file mode 100644
index 9263b6f..0000000
--- a/python/tests/files/example_ungrobided.tsv
+++ /dev/null
@@ -1,20 +0,0 @@
-sha1:23LOSW2QVMKUYXPFZBXQHBBNQR45WTMU {"c": 1, "d": "2017-10-27T22:21:13", "f": "PDFS-20171027214658-00155.warc.gz", "o": 984263791, "u": "http://circ.ahajournals.org/content/circulationaha/53/6/965.full.pdf"} application/pdf {"c_size": 1050532, "dt": "20171027222113", "offset": 984263791, "surt": "org,ahajournals,circ)/content/circulationaha/53/6/965.full.pdf", "url": "http://circ.ahajournals.org/content/circulationaha/53/6/965.full.pdf", "warc": "PDFS-20171027125450-crawl815/PDFS-20171027214658-00155.warc.gz"}
-sha1:23M2N262M5TWB7F3BVB6ESD3Q26SMPFA {"c": 1, "d": "2012-09-29T07:05:16", "f": "ARCHIVEIT-219-QUARTERLY-FWGZDI-20120929065657-00119-crawling203.us.archive.org-6680.warc.gz", "o": 83570746, "u": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=37&id=225&artlang=en"} application/pdf {"c_size": 3590, "dt": "20120929070516", "offset": 83570746, "surt": "edu,indiana)/~orafaq/faq/pdf.php?artlang=en&cat=37&id=225", "url": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=37&id=225&artlang=en", "warc": "ARCHIVEIT-219-QUARTERLY-FWGZDI-00001/ARCHIVEIT-219-QUARTERLY-FWGZDI-20120929065657-00119-crawling203.us.archive.org-6680.warc.gz"}
-sha1:23MFQLDGP4WJD67BS7ERMYQUF7TGCG5X {"c": 1, "d": "2017-08-25T15:19:28", "f": "MSAG-PDF-CRAWL-2017-08-04-20170825143335512-08107-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", "o": 573475485, "u": "http://www.bloodjournal.org/content/bloodjournal/77/7/1484.full.pdf?sso-checked=true"} application/pdf {"c_size": 3411470, "dt": "20170825151928", "offset": 573475485, "surt": "org,bloodjournal)/content/bloodjournal/77/7/1484.full.pdf?sso-checked=true", "url": "http://www.bloodjournal.org/content/bloodjournal/77/7/1484.full.pdf?sso-checked=true", "warc": "MSAG-PDF-CRAWL-2017-08-04-20170825114428485-08102-08111-wbgrp-svc284/MSAG-PDF-CRAWL-2017-08-04-20170825143335512-08107-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"}
-sha1:23MG6K5Z3JENYCZ2OGTNOJ7QPYANJRCZ {"c": 1, "d": "2017-10-10T11:29:50", "f": "PDFS-20171010110639-00278.warc.gz", "o": 665222706, "u": "http://circ.ahajournals.org/content/circulationaha/53/5/797.full.pdf?download=true"} application/pdf {"c_size": 1107121, "dt": "20171010112950", "offset": 665222706, "surt": "org,ahajournals,circ)/content/circulationaha/53/5/797.full.pdf?download=true", "url": "http://circ.ahajournals.org/content/circulationaha/53/5/797.full.pdf?download=true", "warc": "PDFS-20171010110639-crawl815/PDFS-20171010110639-00278.warc.gz"}
-sha1:23MN67JQKDWRUXJMXXJ2GX6O43SQIV76 {"c": 1, "d": "2015-06-06T18:17:08", "f": "ARCHIVEIT-219-QUARTERLY-9582-20150606125000169-00071-wbgrp-crawl067.us.archive.org-6440.warc.gz", "o": 603211220, "u": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=95&artlang=en"} application/pdf {"c_size": 3450, "dt": "20150606181708", "offset": 603211220, "surt": "edu,indiana)/~orafaq/faq/pdf.php?artlang=en&cat=36&id=95", "url": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=95&artlang=en", "warc": "ARCHIVEIT-219-QUARTERLY-9582-00007/ARCHIVEIT-219-QUARTERLY-9582-20150606125000169-00071-wbgrp-crawl067.us.archive.org-6440.warc.gz"}
-sha1:23MQCEOMQS5SMZCXIPNQ4E3ZKOCF6DIM {"c": 1, "d": "2004-01-02T19:45:22", "f": "DU_crawl10.20040102194453.arc.gz", "o": 38062592, "u": "http://www.csupomona.edu:80/~darsadmin/ACTIONITEMS.pdf"} application/pdf {"c_size": 15406, "dt": "20040102194522", "offset": 38062592, "surt": "edu,csupomona)/~darsadmin/actionitems.pdf", "url": "http://www.csupomona.edu:80/~darsadmin/ACTIONITEMS.pdf", "warc": "DU_crawl10.20040102181929-c/DU_crawl10.20040102194453.arc.gz"}
-sha1:23NKO4TW6XCESXMSUOOICI3AXVK6Z5BL {"c": 1, "d": "2015-04-27T11:16:33", "f": "eric.ed.gov-inf-20150409-030712-1648j-00064.warc.gz", "o": 3872820483, "u": "http://files.eric.ed.gov/fulltext/ED088632.pdf"} application/pdf {"c_size": 2223528, "dt": "20150427111633", "offset": 3872820483, "surt": "gov,ed,eric,files)/fulltext/ed088632.pdf", "url": "http://files.eric.ed.gov/fulltext/ED088632.pdf", "warc": "archiveteam_archivebot_go_20150427150006/eric.ed.gov-inf-20150409-030712-1648j-00064.warc.gz"}
-sha1:23NW2EPLXDA6UBIJLQMM2DJ2K3GL3WTB {"c": 1, "d": "2014-08-13T09:04:30", "f": "WIDE-20140813084304-09684.warc.gz", "o": 726289594, "u": "http://research.sdccd.edu/docs/Accreditation/2012%20Surveys/Employee%20-%20Briefing/Mesa%20College%202012%20Employee%20Feedback%20Survey%20Briefing.pdf"} application/pdf {"c_size": 3472527, "dt": "20140813090430", "offset": 726289594, "surt": "edu,sdccd,research)/docs/accreditation/2012%20surveys/employee%20-%20briefing/mesa%20college%202012%20employee%20feedback%20survey%20briefing.pdf", "url": "http://research.sdccd.edu/docs/Accreditation/2012%20Surveys/Employee%20-%20Briefing/Mesa%20College%202012%20Employee%20Feedback%20Survey%20Briefing.pdf", "warc": "WIDE-20140813074743-crawl424/WIDE-20140813084304-09684.warc.gz"}
-sha1:23OQICQ4IBVNHJBWJX5ON3QR26KNMQNT {"c": 1, "d": "2010-06-29T00:35:17", "f": "EDG-20100628234135-01241-ia360918.us.archive.org.warc.gz", "o": 572430160, "u": "http://journalism.arizona.edu/news/rockypt6.pdf"} application/pdf {"c_size": 194706, "dt": "20100629003517", "offset": 572430160, "surt": "edu,arizona,journalism)/news/rockypt6.pdf", "url": "http://journalism.arizona.edu/news/rockypt6.pdf", "warc": "EDG-20100628214935-01235-01243-ia360918-20100629023741-00000/EDG-20100628234135-01241-ia360918.us.archive.org.warc.gz"}
-sha1:23OT2AAYPJ3Z5ZOQXVJJTVTKY6QUPICI {"c": 1, "d": "2007-02-25T17:49:01", "f": "38_0_20070225174831_crawl28.arc.gz", "o": 93868066, "u": "http://www.ece.tufts.edu:80/~hopwood/tampa-proceedings.pdf"} application/pdf {"c_size": 162157, "dt": "20070225174901", "offset": 93868066, "surt": "edu,tufts,ece)/~hopwood/tampa-proceedings.pdf", "url": "http://www.ece.tufts.edu:80/~hopwood/tampa-proceedings.pdf", "warc": "38_0_20070225173722_crawl28-c/38_0_20070225174831_crawl28.arc.gz"}
-sha1:23OUFX3ZYMF53HY4RUONR5PKN4HXN4O3 {"c": 1, "d": "2004-05-26T06:45:34", "f": "DW_crawl10.20040526064432.arc.gz", "o": 67593910, "u": "http://207.36.165.114:80/NewOrleans/Papers/1301466.pdf"} application/pdf {"c_size": 306879, "dt": "20040526064534", "offset": 67593910, "surt": "114,165,36,207)/neworleans/papers/1301466.pdf", "url": "http://207.36.165.114:80/NewOrleans/Papers/1301466.pdf", "warc": "DW_crawl10.20040525230808-c/DW_crawl10.20040526064432.arc.gz"}
-sha1:23PA23UIWCBA3CSTDK2JYX7ZIVOHULFG {"c": 1, "d": "2016-02-05T21:48:33", "f": "NLNZ-NZ-CRAWL-005-20160205211003375-02839-6291~wbgrp-crawl007.us.archive.org~8443.warc.gz", "o": 630386943, "u": "http://homepages.engineering.auckland.ac.nz/~smohan/Outreach/Docs/2013/TTU_REU2013.pdf"} application/pdf {"c_size": 2979614, "dt": "20160205214833", "offset": 630386943, "surt": "nz,ac,auckland,engineering,homepages)/~smohan/outreach/docs/2013/ttu_reu2013.pdf", "url": "http://homepages.engineering.auckland.ac.nz/~smohan/Outreach/Docs/2013/TTU_REU2013.pdf", "warc": "NLNZ-NZ-CRAWL-005-20160205211003375-02839-02848-wbgrp-crawl007/NLNZ-NZ-CRAWL-005-20160205211003375-02839-6291~wbgrp-crawl007.us.archive.org~8443.warc.gz"}
-sha1:23PGC74CTD7P6PCF3MZZZJMPYFXRK3OB {"c": 1, "d": "2005-03-17T15:05:51", "f": "EC_binary1_crawl30.20050317150502.arc.gz", "o": 75675778, "u": "http://www.csupomona.edu:80/%7Eengineering/programs/courses/aro/course_outlines/aro_407.pdf"} application/pdf {"c_size": 4842, "dt": "20050317150551", "offset": 75675778, "surt": "edu,csupomona)/~engineering/programs/courses/aro/course_outlines/aro_407.pdf", "url": "http://www.csupomona.edu:80/%7Eengineering/programs/courses/aro/course_outlines/aro_407.pdf", "warc": "EC_binary1_crawl30.20050317135651-c/EC_binary1_crawl30.20050317150502.arc.gz"}
-sha1:23PKJEQWUJAIQQSLP3GCCC5VDXN4RFCX {"c": 1, "d": "2017-10-10T23:50:37", "f": "WIDE-20171010214240-16560.warc.gz", "o": 962106404, "u": "http://www.nbrb.by/bv/articles/8997.pdf"} application/pdf {"c_size": 273375, "dt": "20171010235037", "offset": 962106404, "surt": "by,nbrb)/bv/articles/8997.pdf", "url": "http://www.nbrb.by/bv/articles/8997.pdf", "warc": "WIDE-20171010202419-crawl424/WIDE-20171010214240-16560.warc.gz"}
-sha1:23PRILJUIQUKHRYQIUYAKSBFPH53FOGT {"c": 1, "d": "2017-07-14T18:51:38", "f": "WIDE-20170714181144-06521.warc.gz", "o": 820382225, "u": "http://carsandracingstuff.com/library/articles/32538.pdf"} application/pdf {"c_size": 125426, "dt": "20170714185138", "offset": 820382225, "surt": "com,carsandracingstuff)/library/articles/32538.pdf", "url": "http://carsandracingstuff.com/library/articles/32538.pdf", "warc": "WIDE-20170714174218-crawl426/WIDE-20170714181144-06521.warc.gz"}
-sha1:23PTUXWSNSVE4HS5J7ELDUUG63J2FPCI {"c": 1, "d": "2016-06-09T00:27:36", "f": "WIDE-20160609001810-06993.warc.gz", "o": 287880616, "u": "http://www.case-research.eu/sites/default/files/publications/18092393_E-brief_Dabrowski_Monetary_Policy_final_0.pdf"} application/pdf {"c_size": 68262, "dt": "20160609002736", "offset": 287880616, "surt": "eu,case-research)/sites/default/files/publications/18092393_e-brief_dabrowski_monetary_policy_final_0.pdf", "url": "http://www.case-research.eu/sites/default/files/publications/18092393_E-brief_Dabrowski_Monetary_Policy_final_0.pdf", "warc": "WIDE-20160609000312-crawl427/WIDE-20160609001810-06993.warc.gz"}
-sha1:23PW2APYHNBPIBRIVNQ6TMKUNY53UL3D {"c": 1, "d": "2016-01-07T03:29:03", "f": "MUSEUM-20160107025230-02354.warc.gz", "o": 413484441, "u": "http://www.portlandoregon.gov/fire/article/363695"} application/pdf {"c_size": 44600, "dt": "20160107032903", "offset": 413484441, "surt": "gov,portlandoregon)/fire/article/363695", "url": "http://www.portlandoregon.gov/fire/article/363695", "warc": "MUSEUM-20160107004301-crawl891/MUSEUM-20160107025230-02354.warc.gz"}
-sha1:23RJIHUIOYY5747CR6YYCTMACXDCFYTT {"c": 1, "d": "2014-06-07T18:00:56", "f": "ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz", "o": 720590380, "u": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=264&artlang=en"} application/pdf {"c_size": 3727, "dt": "20140607180056", "offset": 720590380, "surt": "edu,indiana)/~orafaq/faq/pdf.php?artlang=en&cat=36&id=264", "url": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=264&artlang=en", "warc": "ARCHIVEIT-219-QUARTERLY-20047-00001/ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz"}
-sha1:23SMLYPFEGIRV6M37FJ5D364TXQXCSMR {"c": 1, "d": "2011-07-12T22:20:32", "f": "WIDE-20110712221302-03146.warc.gz", "o": 222089710, "u": "http://media.dailyuw.com/papers/_091030_7-14_color_web.pdf"} application/pdf {"c_size": 4654708, "dt": "20110712222032", "offset": 222089710, "surt": "com,dailyuw,media)/papers/_091030_7-14_color_web.pdf", "url": "http://media.dailyuw.com/papers/_091030_7-14_color_web.pdf", "warc": "WIDE-20110712221302-crawl413/WIDE-20110712221302-03146.warc.gz"}
-sha1:23SN4XBPSCRPRIHH5UAV45LFCP3VDV3V {"c": 1, "d": "2010-10-28T09:03:57", "f": "WIDE-20101028084449158-00409-23450~ia360921.us.archive.org~9443.warc.gz", "o": 756726028, "u": "http://cdacnoida.in/ASCNT-2010/Language%20Technology/Paper/Reducing%20Errors%20in%20Translation%20using%20Pre-editor%20for%20Indian%20English%20Sentences.pdf"} application/pdf {"c_size": 98408, "dt": "20101028090357", "offset": 756726028, "surt": "in,cdacnoida)/ascnt-2010/language%20technology/paper/reducing%20errors%20in%20translation%20using%20pre-editor%20for%20indian%20english%20sentences.pdf", "url": "http://cdacnoida.in/ASCNT-2010/Language%20Technology/Paper/Reducing%20Errors%20in%20Translation%20using%20Pre-editor%20for%20Indian%20English%20Sentences.pdf", "warc": "WIDE-20101028063239344-00397-00415-ia360921/WIDE-20101028084449158-00409-23450~ia360921.us.archive.org~9443.warc.gz"}
diff --git a/python/tests/test_backfill_hbase_from_cdx.py b/python/tests/test_backfill_hbase_from_cdx.py
deleted file mode 100644
index 070662b..0000000
--- a/python/tests/test_backfill_hbase_from_cdx.py
+++ /dev/null
@@ -1,74 +0,0 @@
-"""
-TODO: could probably refactor to use unittest.mock.patch('happybase')
-"""
-
-import io
-import json
-import pytest
-import mrjob
-import happybase_mock
-from backfill_hbase_from_cdx import MRCDXBackfillHBase
-
-@pytest.fixture
-def job():
- """
- Note: this mock only seems to work with job.run_mapper(), not job.run();
- the later results in a separate instantiation without the mock?
- """
- job = MRCDXBackfillHBase(['--no-conf', '-'])
-
- conn = happybase_mock.Connection()
- conn.create_table('wbgrp-journal-extract-test',
- {'file': {}, 'grobid0': {}, 'f': {}})
- job.hb_table = conn.table('wbgrp-journal-extract-test')
-
- return job
-
-
-def test_some_lines(job):
-
- raw = io.BytesIO(b"""
-com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 301 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz
-eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=761393014319a39f40d32ae3eb3a853f?sequence=1 20170705062202 http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1 application/PDF 200 MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J - - 854156 328850624 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz
-com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robots.txt text/plain 200 6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD - - 638 398190140 CITESEERX-CRAWL-2017-06-20-20170705062707827-00049-00058-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705063158203-00053-31209~wbgrp-svc284.us.archive.org~8443.warc.gz
-""")
-
- job.sandbox(stdin=raw)
- job.run_mapper()
-
- assert job.hb_table.row(b'1') == {}
- # HTTP 301
- assert job.hb_table.row(b'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ') == {}
- # valid
- assert job.hb_table.row(b'sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J') != {}
- # text/plain
- assert job.hb_table.row(b'sha1:6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD') == {}
-
- row = job.hb_table.row(b'sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J')
- assert row[b'file:mime'] == b"application/pdf"
-
- file_cdx = json.loads(row[b'file:cdx'].decode('utf-8'))
- assert int(file_cdx['offset']) == 328850624
-
- f_c = json.loads(row[b'f:c'].decode('utf-8'))
- assert f_c['u'] == "http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1"
- assert b'i' not in f_c
-
-def test_parse_cdx_skip(job):
-
- job.mapper_init()
-
- print("CDX prefix")
- raw = " com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
- info, status = job.mapper(None, raw).__next__()
- assert info is None
- assert status['status'] == "invalid"
- assert 'prefix' in status['reason']
-
- print("mimetype")
- raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf text/html 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
- info, status = job.mapper(None, raw).__next__()
- assert info is None
- assert status['status'] == "skip"
- assert 'mimetype' in status['reason']
-
diff --git a/python/tests/test_extraction_cdx_grobid.py b/python/tests/test_extraction_cdx_grobid.py
deleted file mode 100644
index 471d94a..0000000
--- a/python/tests/test_extraction_cdx_grobid.py
+++ /dev/null
@@ -1,319 +0,0 @@
-
-import io
-import json
-import mrjob
-import pytest
-import struct
-import responses
-import happybase_mock
-import wayback.exception
-from unittest import mock
-from extraction_cdx_grobid import MRExtractCdxGrobid
-
-
-FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843)
-OK_CDX_LINE = b"""com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"""
-
-with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f:
- REAL_TEI_XML = f.read()
-
-@pytest.fixture
-def job():
- """
- Note: this mock only seems to work with job.run_mapper(), not job.run();
- the later results in a separate instantiation without the mock?
- """
- job = MRExtractCdxGrobid(['--no-conf', '-'])
-
- conn = happybase_mock.Connection()
- conn.create_table('wbgrp-journal-extract-test',
- {'file': {}, 'grobid0': {}, 'f': {}})
- job.hb_table = conn.table('wbgrp-journal-extract-test')
-
- return job
-
-
-@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
-@responses.activate
-def test_mapper_lines(mock_fetch, job):
-
- responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,
- body=REAL_TEI_XML, content_type='text/xml')
-
- raw = io.BytesIO(b"""
-com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 301 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz
-eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=761393014319a39f40d32ae3eb3a853f?sequence=1 20170705062202 http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1 application/PDF 200 MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J - - 854156 328850624 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz
-com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robots.txt text/plain 200 6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD - - 638 398190140 CITESEERX-CRAWL-2017-06-20-20170705062707827-00049-00058-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705063158203-00053-31209~wbgrp-svc284.us.archive.org~8443.warc.gz
-""")
-
- output = io.BytesIO()
- job.sandbox(stdin=raw, stdout=output)
-
- job.run_mapper()
-
- # for debugging tests
- #print(output.getvalue().decode('utf-8'))
- #print(list(job.hb_table.scan()))
-
- # wayback gets FETCH 1x times
- mock_fetch.assert_called_once_with(
- "CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz",
- 328850624,
- 854156)
-
- # grobid gets POST 1x times
- assert len(responses.calls) == 1
-
- # HBase
- assert job.hb_table.row(b'1') == {}
- # HTTP 301
- assert job.hb_table.row(b'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ') == {}
- # valid
- assert job.hb_table.row(b'sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J') != {}
- # text/plain
- assert job.hb_table.row(b'sha1:6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD') == {}
-
- # Saved extraction info
- row = job.hb_table.row(b'sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J')
-
- assert struct.unpack("!q", row[b'file:size'])[0] == len(FAKE_PDF_BYTES)
- assert row[b'file:mime'] == b"application/pdf"
- assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200
- # TODO: assert row[b'grobid0:quality'] == None
- status = json.loads(row[b'grobid0:status'].decode('utf-8'))
- assert type(status) == type(dict())
- assert row[b'grobid0:tei_xml'].decode('utf-8') == REAL_TEI_XML
- tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8'))
- metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8'))
- assert tei_json['title'] == metadata['title']
- assert 'body' in tei_json
- assert 'body' not in metadata
-
-def test_parse_cdx_invalid(job):
-
- print("valid")
- raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
- info, status = job.parse_line(raw)
- assert status is None
-
- print("space-prefixed line")
- raw = " com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
- info, status = job.parse_line(raw)
- assert info is None
- assert status['status'] == "invalid"
- assert 'prefix' in status['reason']
-
- print("commented line")
- raw = "#com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
- info, status = job.parse_line(raw)
- assert info is None
- assert status['status'] == "invalid"
- assert 'prefix' in status['reason']
-
- print("wrong column count")
- raw = "a b c d"
- info, status = job.parse_line(raw)
- assert info is None
- assert status['status'] == "invalid"
- assert 'parse' in status['reason']
-
- print("missing mimetype")
- raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf - 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
- info, status = job.parse_line(raw)
- assert info is None
- print(status)
- assert status['status'] == "invalid"
- assert 'parse' in status['reason']
-
- print("HTTP status")
- raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 501 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
- info, status = job.parse_line(raw)
- assert info is None
- assert status['status'] == "invalid"
-
- print("datetime")
- raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 501 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
- info, status = job.parse_line(raw)
- assert info is None
- assert status['status'] == "invalid"
-
-
-def test_parse_cdx_skip(job):
-
- job.mapper_init()
-
- print("warc format")
- raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
- info, status = job.mapper(None, raw).__next__()
- assert info is None
- assert status['status'] == "skip"
- assert 'WARC' in status['reason']
-
- print("mimetype")
- raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf text/html 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
- info, status = job.mapper(None, raw).__next__()
- assert info is None
- assert status['status'] == "skip"
- assert 'mimetype' in status['reason']
-
-
-@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
-@responses.activate
-def test_grobid_503(mock_fetch, job):
-
- status = b'{"status": "done broke due to 503"}'
- responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=503,
- body=status)
-
- output = io.BytesIO()
- job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)
- job.run_mapper()
- row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ')
- status = json.loads(row[b'grobid0:status'].decode('utf-8'))
- assert json.loads(row[b'grobid0:status'].decode('utf-8')) == status
-
-
-@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
-@responses.activate
-def test_grobid_not_xml(mock_fetch, job):
-
- payload = b'this is not XML'
- responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,
- body=payload)
-
- output = io.BytesIO()
- job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)
- job.run_mapper()
- output = output.getvalue().decode('utf-8')
- row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ')
- assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200
- assert row[b'grobid0:tei_xml'] == payload
- assert b'grobid0:tei_json' not in row
- assert "XML parse error" in output
-
-
-@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
-@responses.activate
-def test_grobid_not_tei(mock_fetch, job):
-
- payload = b'<xml></xml>'
- responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,
- body=payload)
-
- output = io.BytesIO()
- job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)
- job.run_mapper()
- output = output.getvalue().decode('utf-8')
- row = job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ')
- assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200
- assert row[b'grobid0:tei_xml'] == payload
- assert b'grobid0:tei_json' not in row
- assert "non-TEI content" in output
-
-
-@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
-def test_grobid_invalid_connection(mock_fetch, job):
-
- status = b'{"status": "done broke"}'
- job.options.grobid_uri = 'http://host.invalid:8070/api/processFulltextDocument'
-
- output = io.BytesIO()
- job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)
- job.run_mapper()
- output = output.getvalue().decode('utf-8')
- assert 'error' in output
- assert 'GROBID' in output
- assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') == {}
-
-
-def test_wayback_failure(job):
-
- job.options.warc_uri_prefix = 'http://host.invalid/'
-
- output = io.BytesIO()
- job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)
- job.run_mapper()
- output = output.getvalue().decode('utf-8')
- assert 'error' in output
- assert 'wayback' in output
- assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') == {}
-
-
-@mock.patch('extraction_cdx_grobid.ResourceStore')
-def test_wayback_not_found(mock_rs, job):
-
- # This is... a little convoluded. Basically creating a 404 situation for
- # reading a wayback resource.
- mock_resource = mock.MagicMock()
- mock_resource.get_status.return_value = (404, "Not Found")
- mock_rso = mock.MagicMock()
- mock_rso.load_resource.return_value = mock_resource
- mock_rs.return_value = mock_rso
- print(mock_rs().load_resource().get_status())
-
- job.options.warc_uri_prefix = 'http://dummy-archive.org/'
-
- output = io.BytesIO()
- job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output)
- job.run_mapper()
- output = output.getvalue().decode('utf-8')
-
- print(output)
- assert 'error' in output
- assert 'not 200' in output
- assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') == {}
-
-
-@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
-@responses.activate
-def test_mapper_rerun(mock_fetch, job):
-
- responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,
- body=REAL_TEI_XML, content_type='text/xml')
-
- output1 = io.BytesIO()
- job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output1)
- job.run_mapper()
- output1 = output1.getvalue().decode('utf-8')
-
- # wayback gets FETCH 1x times
- assert mock_fetch.call_count == 1
- # grobid gets POST 1x times
- assert len(responses.calls) == 1
- # HBase
- assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') != {}
- assert 'success' in output1
-
- # Run again, same line
- output2 = io.BytesIO()
- job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output2)
- job.run_mapper()
- output2 = output2.getvalue().decode('utf-8')
-
- # wayback still only FETCH 1x times
- assert mock_fetch.call_count == 1
- # grobid still only POST 1x times
- assert len(responses.calls) == 1
- assert 'existing' in output2
-
-@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
-@responses.activate
-def test_mapper_previously_backfilled(mock_fetch, job):
-
- responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,
- body=REAL_TEI_XML, content_type='text/xml')
-
- job.hb_table.put(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ',
- {b'f:c': b'{"some": "dict"}', b'file:col': b'bogus'})
- assert job.hb_table.row(b'sha1:ABCDEF12345Q2MSVX7XZKYAYSCX5QBYJ') != {}
-
- output1 = io.BytesIO()
- job.sandbox(stdin=io.BytesIO(OK_CDX_LINE), stdout=output1)
- job.run_mapper()
- output1 = output1.getvalue().decode('utf-8')
-
- # wayback gets FETCH 1x times
- assert mock_fetch.call_count == 1
- # grobid gets POST 1x times
- assert len(responses.calls) == 1
- assert 'success' in output1
diff --git a/python/tests/test_extraction_ungrobided.py b/python/tests/test_extraction_ungrobided.py
deleted file mode 100644
index cb46d29..0000000
--- a/python/tests/test_extraction_ungrobided.py
+++ /dev/null
@@ -1,178 +0,0 @@
-
-import io
-import json
-import mrjob
-import pytest
-import struct
-import responses
-import happybase_mock
-import wayback.exception
-from unittest import mock
-from common import parse_ungrobided_line
-from extraction_ungrobided import MRExtractUnGrobided
-
-
-FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843)
-OK_UNGROBIDED_LINE = b"\t".join((
- b"sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ",
- b"""{"c": 1, "d": "2017-07-06T07:54:11", "f": "CITESEERX-CRAWL-2017-06-20-20170706075012840-00388-3671~wbgrp-svc285.us.archive.org~8443.warc.gz", "o": 914718776, "u": "http://www.ibc7.org/article/file_down.php?mode%3Darticle_print%26pid%3D250"}""",
- b"application/pdf",
- b"""{"c_size": 501, "dt": "20170706075411", "offset": 914718776, "surt": "org,ibc7)/article/file_down.php?mode=article_print&pid=250", "url": "http://www.ibc7.org/article/file_down.php?mode%3Darticle_print%26pid%3D250", "warc": "CITESEERX-CRAWL-2017-06-20-20170706074206206-00379-00388-wbgrp-svc285/CITESEERX-CRAWL-2017-06-20-20170706075012840-00388-3671~wbgrp-svc285.us.archive.org~8443.warc.gz"}""",
-))
-
-with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f:
- REAL_TEI_XML = f.read()
-
-@pytest.fixture
-def job():
- """
- Note: this mock only seems to work with job.run_mapper(), not job.run();
- the later results in a separate instantiation without the mock?
- """
- job = MRExtractUnGrobided(['--no-conf', '-'])
-
- conn = happybase_mock.Connection()
- conn.create_table('wbgrp-journal-extract-test',
- {'file': {}, 'grobid0': {}, 'f': {}})
- job.hb_table = conn.table('wbgrp-journal-extract-test')
-
- return job
-
-
-@mock.patch('extraction_ungrobided.MRExtractUnGrobided.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
-@responses.activate
-def test_mapper_single_line(mock_fetch, job):
-
- responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,
- body=REAL_TEI_XML, content_type='text/xml')
-
- raw = io.BytesIO(OK_UNGROBIDED_LINE)
-
- output = io.BytesIO()
- job.sandbox(stdin=raw, stdout=output)
-
- job.run_mapper()
-
- # for debugging tests
- #print(output.getvalue().decode('utf-8'))
- #print(list(job.hb_table.scan()))
-
- # wayback gets FETCH 1x times
- mock_fetch.assert_called_once_with(
- "CITESEERX-CRAWL-2017-06-20-20170706074206206-00379-00388-wbgrp-svc285/CITESEERX-CRAWL-2017-06-20-20170706075012840-00388-3671~wbgrp-svc285.us.archive.org~8443.warc.gz",
- 914718776,
- 501)
-
- # grobid gets POST 1x times
- assert len(responses.calls) == 1
-
- # HBase
- assert job.hb_table.row(b'1') == {}
-
- # Saved extraction info
- row = job.hb_table.row(b'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ')
-
- assert struct.unpack("!q", row[b'file:size'])[0] == len(FAKE_PDF_BYTES)
- # file:mime should actually not get clobbered by GROBID updater
- #assert row[b'file:mime'] == b"application/pdf"
- assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200
- # TODO: assert row[b'grobid0:quality'] == None
- status = json.loads(row[b'grobid0:status'].decode('utf-8'))
- assert type(status) == type(dict())
- assert row[b'grobid0:tei_xml'].decode('utf-8') == REAL_TEI_XML
- tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8'))
- metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8'))
- assert tei_json['title'] == metadata['title']
- assert 'body' in tei_json
- assert 'body' not in metadata
-
-@mock.patch('extraction_ungrobided.MRExtractUnGrobided.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
-@responses.activate
-def test_mapper_lines(mock_fetch, job):
-
- responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,
- body=REAL_TEI_XML, content_type='text/xml')
-
- raw = io.BytesIO(b"""sha1:23PTUXWSNSVE4HS5J7ELDUUG63J2FPCI\t{"c": 1, "d": "2016-06-09T00:27:36", "f": "WIDE-20160609001810-06993.warc.gz", "o": 287880616, "u": "http://www.case-research.eu/sites/default/files/publications/18092393_E-brief_Dabrowski_Monetary_Policy_final_0.pdf"}\tapplication/pdf\t{"c_size": 68262, "dt": "20160609002736", "offset": 287880616, "surt": "eu,case-research)/sites/default/files/publications/18092393_e-brief_dabrowski_monetary_policy_final_0.pdf", "url": "http://www.case-research.eu/sites/default/files/publications/18092393_E-brief_Dabrowski_Monetary_Policy_final_0.pdf", "warc": "WIDE-20160609000312-crawl427/WIDE-20160609001810-06993.warc.gz"}
-sha1:23PW2APYHNBPIBRIVNQ6TMKUNY53UL3D\t{"c": 1, "d": "2016-01-07T03:29:03", "f": "MUSEUM-20160107025230-02354.warc.gz", "o": 413484441, "u": "http://www.portlandoregon.gov/fire/article/363695"}\tapplication/pdf\t{"c_size": 44600, "dt": "20160107032903", "offset": 413484441, "surt": "gov,portlandoregon)/fire/article/363695", "url": "http://www.portlandoregon.gov/fire/article/363695", "warc": "MUSEUM-20160107004301-crawl891/MUSEUM-20160107025230-02354.warc.gz"}
-sha1:23RJIHUIOYY5747CR6YYCTMACXDCFYTT\t{"c": 1, "d": "2014-06-07T18:00:56", "f": "ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz", "o": 720590380, "u": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=264&artlang=en"}\tapplication/pdf\t{"c_size": 3727, "dt": "20140607180056", "offset": 720590380, "surt": "edu,indiana)/~orafaq/faq/pdf.php?artlang=en&cat=36&id=264", "url": "https://www.indiana.edu/~orafaq/faq/pdf.php?cat=36&id=264&artlang=en", "warc": "ARCHIVEIT-219-QUARTERLY-20047-00001/ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz"}""")
-
-
- output = io.BytesIO()
- job.sandbox(stdin=raw, stdout=output)
-
- job.run_mapper()
-
- # for debugging tests
- #print(output.getvalue().decode('utf-8'))
- #print(list(job.hb_table.scan()))
-
- # grobid gets POST 3x times
- assert len(responses.calls) == 3
-
- # wayback gets FETCH 3x times
- mock_fetch.assert_has_calls((
- mock.call("WIDE-20160609000312-crawl427/WIDE-20160609001810-06993.warc.gz", 287880616, 68262),
- mock.call("MUSEUM-20160107004301-crawl891/MUSEUM-20160107025230-02354.warc.gz", 413484441, 44600),
- mock.call("ARCHIVEIT-219-QUARTERLY-20047-00001/ARCHIVEIT-219-QUARTERLY-20047-20140607125555378-00017-wbgrp-crawl051.us.archive.org-6442.warc.gz", 720590380, 3727),
- ))
-
- # Saved extraction info
- assert job.hb_table.row(b'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ') == {}
- assert job.hb_table.row(b'sha1:23PTUXWSNSVE4HS5J7ELDUUG63J2FPCI') != {}
- assert job.hb_table.row(b'sha1:23PW2APYHNBPIBRIVNQ6TMKUNY53UL3D') != {}
- assert job.hb_table.row(b'sha1:23RJIHUIOYY5747CR6YYCTMACXDCFYTT') != {}
-
- row = job.hb_table.row(b'sha1:23RJIHUIOYY5747CR6YYCTMACXDCFYTT')
- assert struct.unpack("!q", row[b'file:size'])[0] == len(FAKE_PDF_BYTES)
- # file:mime should actually not get clobbered by GROBID updater
- #assert row[b'file:mime'] == b"application/pdf"
- assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200
- status = json.loads(row[b'grobid0:status'].decode('utf-8'))
- assert type(status) == type(dict())
- assert row[b'grobid0:tei_xml'].decode('utf-8') == REAL_TEI_XML
- tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8'))
- metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8'))
- assert tei_json['title'] == metadata['title']
- assert 'body' in tei_json
- assert 'body' not in metadata
-
-def test_parse_ungrobided_invalid(job):
-
- print("space-prefixed line")
- raw = " com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
- info, status = job.parse_ungrobided_line(raw)
- assert info is None
- assert status['status'] == "invalid"
- assert 'prefix' in status['reason']
-
- print("commented line")
- raw = "#com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
- info, status = job.parse_ungrobided_line(raw)
- assert info is None
- assert status['status'] == "invalid"
- assert 'prefix' in status['reason']
-
- print("wrong column count")
- raw = "a b c d e"
- info, status = job.parse_ungrobided_line(raw)
- assert info is None
- assert status['status'] == "invalid"
- assert 'parse' in status['reason']
-
- print("CDX line, somehow")
- raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf - 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
- info, status = job.parse_ungrobided_line(raw)
- assert info is None
- print(status)
- assert status['status'] == "invalid"
- assert 'parse' in status['reason']
-
-def test_parse_ungrobided_valid():
-
- parsed = parse_ungrobided_line(OK_UNGROBIDED_LINE.decode('utf-8'))
- assert parsed['key'] == "sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ"
- assert parsed['f:c']['u'] == "http://www.ibc7.org/article/file_down.php?mode%3Darticle_print%26pid%3D250"
- assert parsed['file:mime'] == "application/pdf"
- assert parsed['file:cdx']['c_size'] == 501
- assert parsed['file:cdx']['dt'] == "20170706075411"