aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/common.py26
-rwxr-xr-xpython/extraction_ungrobided.py136
-rw-r--r--python/tests/test_extraction_ungrobided.py126
3 files changed, 288 insertions, 0 deletions
diff --git a/python/common.py b/python/common.py
index 6710044..e596b35 100644
--- a/python/common.py
+++ b/python/common.py
@@ -1,4 +1,5 @@
+import json
from datetime import datetime
NORMAL_MIME = (
@@ -71,3 +72,28 @@ def parse_cdx_line(raw_cdx):
# 'i' intentionally not set
heritrix = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1)
return {'key': key, 'file:mime': mime, 'file:cdx': info, 'f:c': heritrix}
+
+def parse_ungrobided_line(raw_line):
+
+ line = raw_line.strip().split("\t")
+ if len(line) != 4:
+ return None
+
+ key = line[0]
+ mime = normalize_mime(line[2])
+ try:
+ f_c = json.loads(line[1])
+ cdx = json.loads(line[3])
+ except json.JSONDecodeError:
+ return None
+
+ if not (key[5:].isalnum() and len(key) == 37 and mime != None):
+ print(mime)
+ print(key)
+ print("FAIL")
+ return None
+
+ if '-' in (key, mime, f_c, cdx):
+ return None
+
+ return {'key': key, 'file:mime': mime, 'file:cdx': cdx, 'f:c': f_c}
diff --git a/python/extraction_ungrobided.py b/python/extraction_ungrobided.py
new file mode 100755
index 0000000..aedf715
--- /dev/null
+++ b/python/extraction_ungrobided.py
@@ -0,0 +1,136 @@
+#!/usr/bin/env python3
+"""
+Variant of extraction_cdx_grobid which takes a partial metadata list as input
+instead of CDX.
+
+This task list is dumped by another Hadoop job which scans over the HBase table
+quickly, which allows this job to skip a (relatively) expensive HBase read
+per-row.
+
+Requires:
+- happybase
+- mrjob
+- wayback/GWB libraries
+"""
+
+# XXX: some broken MRO thing going on in here due to python3 object wrangling
+# in `wayback` library. Means we can't run pylint.
+# pylint: skip-file
+
+import xml
+import json
+import raven
+import struct
+import mrjob
+from common import parse_ungrobided_line
+from extraction_cdx_grobid import MRExtractCdxGrobid, KEY_BLACKLIST, \
+ sentry_client
+
+
+class MRExtractUnGrobided(MRExtractCdxGrobid):
+
+ # CDX lines in; JSON status out
+ #HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.NLineInputFormat'
+ #INPUT_PROTOCOL = mrjob.protocol.RawProtocol
+ INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
+ OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol
+
+ def parse_line(self, raw_line):
+ """Line should be TSV and have non-null fields:
+
+ - key (string)
+ - f:c (string, json)
+ - file:mime (string)
+ - file:cdx (string, json)
+ """
+
+ if (raw_line.startswith(' ') or raw_line.startswith('#')):
+ return None, dict(status="invalid", reason="line prefix", input=raw_line)
+
+ info = parse_ungrobided_line(raw_line)
+ if info is None:
+ return None, dict(status="invalid", reason="ungrobided parse")
+
+ if info['file:mime'] not in self.mime_filter:
+ return None, dict(status="skip", reason="mimetype", mimetype=info['file:mime'])
+
+ # If warc is not item/file.(w)arc.gz form, skip it
+ if len(info['file:cdx']['warc'].split('/')) != 2:
+ return None, dict(status="skip", reason="WARC path not petabox item/file", path=info['file:cdx']['warc'])
+
+ return info, None
+
+ @sentry_client.capture_exceptions
+ def mapper(self, _, raw_line):
+ """
+ 1. parse filtered line
+ 2. fetch data from wayback
+ 3. submit to GROBID
+ 4. convert GROBID response to JSON (and metadata)
+ 6. determine "quality"
+ 6. push results to hbase
+ """
+
+ self.increment_counter('lines', 'total')
+
+ # Parse line and filter down
+ info, status = self.parse_line(raw_line)
+ if info is None:
+ self.increment_counter('lines', status['status'])
+ yield _, status
+ return
+ key = info['key']
+ if key in KEY_BLACKLIST:
+ self.increment_counter('lines', 'blacklist')
+ yield _, dict(status='blacklist', key=key)
+ return
+
+ # Note: this may not get "cleared" correctly
+ sentry_client.extra_context(dict(row_key=key))
+
+ # Do the extraction
+ info, status = self.extract(info)
+ if info is None:
+ self.increment_counter('lines', status['status'])
+ status['key'] = key
+ yield _, status
+ return
+ extraction_status = status
+
+ # Decide what to bother inserting back into HBase
+ # Basically, don't overwrite backfill fields.
+ grobid_status_code = info.get('grobid0:status_code', None)
+ for k in list(info.keys()):
+ if k.encode('utf-8') in ('f:c', 'file:mime', 'file:cdx'):
+ info.pop(k)
+
+ # Convert fields to binary
+ for k in list(info.keys()):
+ if info[k] is None:
+ info.pop(k)
+ # NOTE: we're not actually sending these f:*, file:* keys...
+ elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',
+ 'grobid0:metadata'):
+ assert type(info[k]) == dict
+ info[k] = json.dumps(info[k], sort_keys=True, indent=None)
+ elif k in ('file:size', 'grobid0:status_code'):
+ # encode as int64 in network byte order
+ if info[k] != {} and info[k] != None:
+ info[k] = struct.pack('!q', info[k])
+
+ key = info.pop('key')
+ self.hb_table.put(key, info)
+ self.increment_counter('lines', 'success')
+
+ if extraction_status is not None:
+ yield _, dict(status="partial", key=key,
+ grobid_status_code=grobid_status_code,
+ reason=extraction_status['reason'])
+ else:
+ yield _, dict(status="success",
+ grobid_status_code=grobid_status_code, key=key,
+ extra=extraction_status)
+
+
+if __name__ == '__main__': # pragma: no cover
+ MRExtractCdxGrobid.run()
diff --git a/python/tests/test_extraction_ungrobided.py b/python/tests/test_extraction_ungrobided.py
new file mode 100644
index 0000000..fb681c2
--- /dev/null
+++ b/python/tests/test_extraction_ungrobided.py
@@ -0,0 +1,126 @@
+
+import io
+import json
+import mrjob
+import pytest
+import struct
+import responses
+import happybase_mock
+import wayback.exception
+from unittest import mock
+from common import parse_ungrobided_line
+from extraction_ungrobided import MRExtractUnGrobided
+
+
+FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843)
+OK_UNGROBIDED_LINE = b"\t".join((
+ b"sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ",
+ b"""{"c": 1, "d": "2017-07-06T07:54:11", "f": "CITESEERX-CRAWL-2017-06-20-20170706075012840-00388-3671~wbgrp-svc285.us.archive.org~8443.warc.gz", "o": 914718776, "u": "http://www.ibc7.org/article/file_down.php?mode%3Darticle_print%26pid%3D250"}""",
+ b"application/pdf",
+ b"""{"c_size": 501, "dt": "20170706075411", "offset": 914718776, "surt": "org,ibc7)/article/file_down.php?mode=article_print&pid=250", "url": "http://www.ibc7.org/article/file_down.php?mode%3Darticle_print%26pid%3D250", "warc": "CITESEERX-CRAWL-2017-06-20-20170706074206206-00379-00388-wbgrp-svc285/CITESEERX-CRAWL-2017-06-20-20170706075012840-00388-3671~wbgrp-svc285.us.archive.org~8443.warc.gz"}""",
+))
+
+with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f:
+ REAL_TEI_XML = f.read()
+
+@pytest.fixture
+def job():
+ """
+ Note: this mock only seems to work with job.run_mapper(), not job.run();
+ the later results in a separate instantiation without the mock?
+ """
+ job = MRExtractUnGrobided(['--no-conf', '-'])
+
+ conn = happybase_mock.Connection()
+ conn.create_table('wbgrp-journal-extract-test',
+ {'file': {}, 'grobid0': {}, 'f': {}})
+ job.hb_table = conn.table('wbgrp-journal-extract-test')
+
+ return job
+
+
+@mock.patch('extraction_ungrobided.MRExtractUnGrobided.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
+@responses.activate
+def test_mapper_lines(mock_fetch, job):
+
+ responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,
+ body=REAL_TEI_XML, content_type='text/xml')
+
+ raw = io.BytesIO(OK_UNGROBIDED_LINE)
+
+ output = io.BytesIO()
+ job.sandbox(stdin=raw, stdout=output)
+
+ job.run_mapper()
+
+ # for debugging tests
+ #print(output.getvalue().decode('utf-8'))
+ #print(list(job.hb_table.scan()))
+
+ # wayback gets FETCH 1x times
+ mock_fetch.assert_called_once_with(
+ "CITESEERX-CRAWL-2017-06-20-20170706074206206-00379-00388-wbgrp-svc285/CITESEERX-CRAWL-2017-06-20-20170706075012840-00388-3671~wbgrp-svc285.us.archive.org~8443.warc.gz",
+ 914718776,
+ 501)
+
+ # grobid gets POST 1x times
+ assert len(responses.calls) == 1
+
+ # HBase
+ assert job.hb_table.row(b'1') == {}
+
+ # Saved extraction info
+ row = job.hb_table.row(b'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ')
+
+ assert struct.unpack("!q", row[b'file:size'])[0] == len(FAKE_PDF_BYTES)
+ assert row[b'file:mime'] == b"application/pdf"
+ assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200
+ # TODO: assert row[b'grobid0:quality'] == None
+ status = json.loads(row[b'grobid0:status'].decode('utf-8'))
+ assert type(status) == type(dict())
+ assert row[b'grobid0:tei_xml'].decode('utf-8') == REAL_TEI_XML
+ tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8'))
+ metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8'))
+ assert tei_json['title'] == metadata['title']
+ assert 'body' in tei_json
+ assert 'body' not in metadata
+
+def test_parse_ungrobided_invalid(job):
+
+ print("space-prefixed line")
+ raw = " com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
+ info, status = job.parse_line(raw)
+ assert info is None
+ assert status['status'] == "invalid"
+ assert 'prefix' in status['reason']
+
+ print("commented line")
+ raw = "#com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
+ info, status = job.parse_line(raw)
+ assert info is None
+ assert status['status'] == "invalid"
+ assert 'prefix' in status['reason']
+
+ print("wrong column count")
+ raw = "a b c d e"
+ info, status = job.parse_line(raw)
+ assert info is None
+ assert status['status'] == "invalid"
+ assert 'parse' in status['reason']
+
+ print("CDX line, somehow")
+ raw = "com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf - 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz"
+ info, status = job.parse_line(raw)
+ assert info is None
+ print(status)
+ assert status['status'] == "invalid"
+ assert 'parse' in status['reason']
+
+def test_parse_ungrobided_valid():
+
+ parsed = parse_ungrobided_line(OK_UNGROBIDED_LINE.decode('utf-8'))
+ assert parsed['key'] == "sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ"
+ assert parsed['f:c']['u'] == "http://www.ibc7.org/article/file_down.php?mode%3Darticle_print%26pid%3D250"
+ assert parsed['file:mime'] == "application/pdf"
+ assert parsed['file:cdx']['c_size'] == 501
+ assert parsed['file:cdx']['dt'] == "20170706075411"