aboutsummaryrefslogtreecommitdiffstats
path: root/mapreduce
diff options
context:
space:
mode:
Diffstat (limited to 'mapreduce')
-rwxr-xr-xmapreduce/extraction_cdx_grobid.py87
-rwxr-xr-xmapreduce/grobid2json.py21
-rw-r--r--mapreduce/tests/test_extraction_cdx_grobid.py41
3 files changed, 93 insertions, 56 deletions
diff --git a/mapreduce/extraction_cdx_grobid.py b/mapreduce/extraction_cdx_grobid.py
index ea36e6e..0ba95e6 100755
--- a/mapreduce/extraction_cdx_grobid.py
+++ b/mapreduce/extraction_cdx_grobid.py
@@ -15,16 +15,19 @@ Requires:
import io
import sys
+import json
import struct
import requests
import happybase
import mrjob
from mrjob.job import MRJob
+import wayback.exception
from wayback.resource import Resource
from wayback.resource import ArcResource
from wayback.resourcestore import ResourceStore
from gwb.loader import CDXLoaderFactory
from common import parse_cdx_line
+from grobid2json import do_tei
class MRExtractCdxGrobid(MRJob):
@@ -60,16 +63,13 @@ class MRExtractCdxGrobid(MRJob):
super(MRExtractCdxGrobid, self).__init__(*args, **kwargs)
self.mime_filter = ['application/pdf']
- def grobid_fulltext(self, content):
+ def grobid_process_fulltext(self, content):
r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument",
files={'input': content})
if r.status_code is not 200:
# XXX:
- print("FAIL (Grobid: {}): {}".format(r.content.decode('utf8')))
- else:
- # XXX:
- print("SUCCESS: " + debug_line)
- return r.json()
+ return None
+ return r
def mapper_init(self):
@@ -104,47 +104,56 @@ class MRExtractCdxGrobid(MRJob):
return info, None
- def extract(self, info):
-
- # Fetch data from WARCs in petabox
+ def fetch_warc_content(self, warc_path, offset, c_size):
try:
rstore = ResourceStore(loaderfactory=CDXLoaderFactory())
- gwb_record = rstore.load_resource(
- info['file:cdx']['warc'],
- info['file:cdx']['offset'],
- info['file:cdx']['c_size'])
- except IOError as ioe:
- # XXX: catch correct error
- self.increment_counter('lines', 'existing')
- return _, dict(status="existing")
+ gwb_record = rstore.load_resource(warc_path, offset, c_size)
+ except wayback.exception.ResourceUnavailable as err:
+ # XXX: during testing
+ raise err
+ self.increment_counter('lines', 'petabox_error')
+ return None, dict(status="petabox_error", reason="failed to load file contents")
if gwb_record.get_status()[0] != 200:
self.increment_counter('lines', 'error')
- return _, dict(status="error", reason="non-HTTP-200 WARC content")
+ return None, dict(status="error", reason="non-HTTP-200 WARC content")
+ return gwb_record.open_raw_content()
+
+ def extract(self, info):
+
+ # Fetch data from WARCs in petabox
+ content, status = self.fetch_warc_content(
+ info['file:cdx']['warc'],
+ info['file:cdx']['offset'],
+ info['file:cdx']['c_size'])
+ if status:
+ self.increment_counter('lines', status['status'])
+ return None, status
+
+ info['file:size'] = len(content)
# Submit to GROBID
- content = gwb_record.open_raw_content()
try:
- grobid_result = self.grobid_fulltext(gwb_record.open_raw_content())
+ grobid_response = self.grobid_process_fulltext(content)
except IOError as ioe:
+ raise ioe
# XXX: catch correct error
- self.increment_counter('lines', 'existing')
- return _, dict(status="existing")
-
- info['file:size'] = len(resource_data)
+ self.increment_counter('lines', 'fail')
+ return None, dict(status="fail", reason="GROBID connection")
- info['grobid0:status_code'] = None
- info['grobid0:quality'] = None
- info['grobid0:status'] = {}
- info['grobid0:tei_xml'] = None
- info['grobid0:tei_json'] = {}
- info['grobid0:metadata'] = {}
+ info['grobid0:status_code'] = grobid_response.status_code
+ info['grobid0:tei_xml'] = grobid_response.content
+ info['grobid0:status'] = {} # TODO
# Convert TEI XML to JSON
- # TODO
+ # TODO:
+ info['grobid0:tei_json'] = do_tei(grobid_response.content, encumbered=True)
+ info['grobid0:metadata'] = do_tei(grobid_response.content, encumbered=False)
# Determine extraction "quality"
- # TODO
+ # TODO:
+
+ info['grobid0:quality'] = None
return info, None
@@ -187,19 +196,22 @@ class MRExtractCdxGrobid(MRJob):
# Decide what to bother inserting back into HBase
# Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx')
grobid_status = info.get('grobid0:status_code', None)
- for k in info.keys():
+ for k in list(info.keys()):
if k in oldrow:
info.pop(k)
# Convert fields to binary
- for k in info.keys():
- if k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',
+ for k in list(info.keys()):
+ if info[k] == None:
+ info.pop(k)
+ elif k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',
'grobid0:metadata'):
assert type(info[k]) == dict
info[k] = json.dumps(info[k], sort_keys=True, indent=None)
- if k in ('file:size', 'grobid0:status_code'):
+ elif k in ('file:size', 'grobid0:status_code'):
# encode as int64 in network byte order
- info[k] = struct.pack('!q', info[k])
+ if info[k] != {} and info[k] != None:
+ info[k] = struct.pack('!q', info[k])
key = info.pop('key')
self.hb_table.put(key, info)
@@ -207,6 +219,7 @@ class MRExtractCdxGrobid(MRJob):
yield _, dict(status="success", grobid_status=grobid_status)
+
if __name__ == '__main__': # pragma: no cover
MRExtractCdxGrobid.run()
diff --git a/mapreduce/grobid2json.py b/mapreduce/grobid2json.py
index daf9387..cc6eb2c 100755
--- a/mapreduce/grobid2json.py
+++ b/mapreduce/grobid2json.py
@@ -20,6 +20,7 @@ Prints JSON to stdout, errors to stderr
"""
import os
+import io
import sys
import json
import argparse
@@ -73,11 +74,18 @@ def biblio_info(elem):
return ref
-def do_tei(path, encumbered=True):
+def do_tei(content, encumbered=True):
- info = dict(filename=os.path.basename(path))
+ if type(content) == str:
+ content = io.StringIO(content)
+ elif type(content) == bytes:
+ content = io.BytesIO(content)
- tree = ET.parse(path)
+ info = dict()
+
+ #print(content)
+ #print(content.getvalue())
+ tree = ET.parse(content)
tei = tree.getroot()
header = tei.find('.//{%s}teiHeader' % ns)
@@ -109,7 +117,7 @@ def do_tei(path, encumbered=True):
return info
-def main():
+def main(): # pragma no cover
parser = argparse.ArgumentParser(
description="GROBID TEI XML to JSON",
usage="%(prog)s [options] <teifile>...")
@@ -121,9 +129,10 @@ def main():
args = parser.parse_args()
for filename in args.teifiles:
+ content = open(filename, 'r')
print(json.dumps(
- do_tei(filename,
+ do_tei(content,
encumbered=(not args.no_encumbered))))
-if __name__=='__main__':
+if __name__=='__main__': # pragma no cover
main()
diff --git a/mapreduce/tests/test_extraction_cdx_grobid.py b/mapreduce/tests/test_extraction_cdx_grobid.py
index 1d32c9f..46a89aa 100644
--- a/mapreduce/tests/test_extraction_cdx_grobid.py
+++ b/mapreduce/tests/test_extraction_cdx_grobid.py
@@ -1,13 +1,17 @@
import io
import json
-import pytest
import mrjob
+import pytest
+import struct
import responses
import happybase_mock
+from unittest import mock
from extraction_cdx_grobid import MRExtractCdxGrobid
+FAKE_PDF_BYTES = b"%PDF SOME JUNK" + struct.pack("!q", 112853843)
+
@pytest.fixture
def job():
"""
@@ -22,13 +26,14 @@ def job():
job = MRExtractCdxGrobid(['--no-conf', '-'], hb_table=table)
return job
-
+@mock.patch('extraction_cdx_grobid.MRExtractCdxGrobid.fetch_warc_content', return_value=(FAKE_PDF_BYTES, None))
@responses.activate
-def test_mapper_lines(job):
+def test_mapper_lines(mock_fetch, job):
- fake_grobid = {}
- responses.add(responses.POST, 'http://localhost:9070/api/processFulltextDocument', status=200,
- body=json.dumps(fake_grobid), content_type='application/json')
+ with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f:
+ real_tei_xml = f.read()
+ responses.add(responses.POST, 'http://localhost:8070/api/processFulltextDocument', status=200,
+ body=real_tei_xml, content_type='application/json')
raw = io.BytesIO(b"""
com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 301 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz
@@ -36,16 +41,23 @@ eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=76139301
com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robots.txt text/plain 200 6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD - - 638 398190140 CITESEERX-CRAWL-2017-06-20-20170705062707827-00049-00058-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705063158203-00053-31209~wbgrp-svc284.us.archive.org~8443.warc.gz
""")
- job.sandbox(stdin=raw)
+ output = io.BytesIO()
+ job.sandbox(stdin=raw, stdout=output)
- pytest.skip("need to mock wayback fetch")
job.run_mapper()
+ # for debugging tests
+ #print(output.getvalue().decode('utf-8'))
+ #print(list(job.hb_table.scan()))
+
# wayback gets FETCH 1x times
+ # TODO:
# grobid gets POST 3x times
+ # TODO:
# hbase
+ # TODO:
assert job.hb_table.row(b'1') == {}
@@ -58,15 +70,18 @@ com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robo
row = job.hb_table.row(b'sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J')
- assert struct.unpack("", row[b'file:size']) == 12345
+ assert struct.unpack("!q", row[b'file:size'])[0] == len(FAKE_PDF_BYTES)
assert row[b'file:mime'] == b"application/pdf"
- assert struct.unpack("", row[b'grobid0:status_code']) == 200
- assert row[b'grobid0:quality'] == None # TODO
+ assert struct.unpack("!q", row[b'grobid0:status_code'])[0] == 200
+ # TODO: assert row[b'grobid0:quality'] == None
status = json.loads(row[b'grobid0:status'].decode('utf-8'))
- assert type(row[b'grobid0:status']) == type(dict())
- assert row[b'grobid0:tei_xml'] == "<xml><lorem>ipsum</lorem></xml>"
+ assert type(status) == type(dict())
+ assert row[b'grobid0:tei_xml'].decode('utf-8') == real_tei_xml
tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8'))
metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8'))
+ assert tei_json['title'] == metadata['title']
+ assert 'body' in tei_json
+ assert 'body' not in metadata
def test_parse_cdx_invalid(job):