aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--extraction/TODO2
-rw-r--r--extraction/extraction.py52
-rwxr-xr-xextraction/extraction_cdx_grobid.py248
-rw-r--r--extraction/tests/test_extraction_cdx_grobid.py169
-rw-r--r--extraction/xml2json.py8
5 files changed, 427 insertions, 52 deletions
diff --git a/extraction/TODO b/extraction/TODO
new file mode 100644
index 0000000..ed10834
--- /dev/null
+++ b/extraction/TODO
@@ -0,0 +1,2 @@
+- abstract CDX line reading and HBase stuff out into a common library
+- actual GROBID_SERVER="http://wbgrp-svc096.us.archive.org:8070"
diff --git a/extraction/extraction.py b/extraction/extraction.py
deleted file mode 100644
index cdca433..0000000
--- a/extraction/extraction.py
+++ /dev/null
@@ -1,52 +0,0 @@
-#!/usr/bin/env python3
-
-import io
-import sys
-import requests
-#import happybase
-import mrjob
-from mrjob.job import MRJob
-from wayback.resource import Resource
-from wayback.resource import ArcResource
-from wayback.resourcestore import ResourceStore
-from gwb.loader import CDXLoaderFactory
-
-
-def process_pdf_using_grobid(content_buffer, debug_line):
- """Query GrobId server & process response
- """
- GROBID_SERVER="http://wbgrp-svc096.us.archive.org:8070"
- content = content_buffer.read()
- r = requests.post(GROBID_SERVER + "/api/processFulltextDocument",
- files={'input': content})
- if r.status_code is not 200:
- print("FAIL (Grobid: {}): {}".format(r.content.decode('utf8'), debug_line))
- else:
- print("SUCCESS: " + debug_line)
-
-class Cdx_Record_Pipeline(object):
-
- def read_cdx_and_parse(self, parser_func, accepted_mimes = []):
- """Read in CDX lines and process PDF records fetched over HTTP
- """
- rstore = ResourceStore(loaderfactory=CDXLoaderFactory())
- for line in sys.stdin:
- line = line.rstrip()
- cdx_line = line.split()
- #ignoring NLine offset
- if len(cdx_line) != 12:
- continue
- cdx_line = cdx_line[1:]
- (src_url, timestamp, mime, record_location, record_offset, record_length) = (cdx_line[2], cdx_line[1], cdx_line[3], cdx_line[-1], cdx_line[-2], cdx_line[-3])
- if '-' == record_length or not record_location.endswith('arc.gz') or mime not in accepted_mimes:
- continue
- orig_url = cdx_line[2]
- debug_line = ' '.join(cdx_line)
- try:
- record_location = 'http://archive.org/download/' + record_location
- record_offset = int(record_offset)
- record_length = int(record_length)
- resource_data = rstore.load_resource(record_location, record_offset, record_length)
- parser_func(resource_data.open_raw_content(), debug_line)
- except:
- continue
diff --git a/extraction/extraction_cdx_grobid.py b/extraction/extraction_cdx_grobid.py
new file mode 100755
index 0000000..54d8b71
--- /dev/null
+++ b/extraction/extraction_cdx_grobid.py
@@ -0,0 +1,248 @@
+#!/usr/bin/env python3
+"""
+Streaming Hadoop script to import extract metadata and body from fulltext (eg,
+PDF) files using GROBID. Input is a CDX file; results primarly go to HBase,
+with status written to configurable output stream.
+
+Fulltext files are loaded directly from WARC files in petabox, instead of going
+through the wayback replay.
+
+Requires:
+- happybase
+- mrjob
+- wayback/GWB libraries
+"""
+
+import io
+import sys
+import struct
+import requests
+import happybase
+import mrjob
+from mrjob.job import MRJob
+from wayback.resource import Resource
+from wayback.resource import ArcResource
+from wayback.resourcestore import ResourceStore
+from gwb.loader import CDXLoaderFactory
+
+
+def parse_cdx_line(raw_cdx):
+
+ cdx = raw_cdx.split()
+ if len(cdx) < 11:
+ return None
+
+ surt = cdx[0]
+ dt = cdx[1]
+ url = cdx[2]
+ mime = normalize_mime(cdx[3])
+ http_status = cdx[4]
+ key = cdx[5]
+ c_size = cdx[8]
+ offset = cdx[9]
+ warc = cdx[10]
+
+ if not (key.isalnum() and c_size.isdigit() and offset.isdigit()
+ and http_status == "200" and len(key) == 32 and dt.isdigit()):
+ return None
+
+ if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc):
+ return None
+
+ key = "sha1:{}".format(key)
+
+ info = dict(surt=surt, dt=dt, url=url, c_size=int(c_size),
+ offset=int(offset), warc=warc)
+
+ warc_file = warc.split('/')[-1]
+ dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat()
+ try:
+ dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat()
+ except:
+ return None
+
+ # 'i' intentionally not set
+ heritrix = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1)
+ return {'key': key, 'file:mime': mime, 'file:cdx': info, 'f:c': heritrix}
+
+
+class MRExtractCdxGrobid(MrJob):
+
+ # CDX lines in; JSON status out
+ INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
+ OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol
+
+ def configure_args(self):
+ super(MRExtractCdxGrobid, self).configure_args()
+
+ self.add_passthru_arg('--hbase-table',
+ type=str,
+ default='wbgrp-journal-extract-0-qa',
+ help='HBase table to backfill into (must exist)')
+ self.add_passthru_arg('--hbase-host',
+ type=str,
+ default='localhost',
+ help='HBase thrift API host to connect to')
+ self.add_passthru_arg('--grobid-uri',
+ type=str,
+ default='http://localhost:8070',
+ help='URI of GROBID API Server')
+
+ def __init__(self, *args, **kwargs):
+
+ # Allow passthrough for tests
+ if 'hb_table' in kwargs:
+ self.hb_table = kwargs.pop('hb_table')
+ else:
+ self.hb_table = None
+
+ super(MRExtractCdxGrobid, self).__init__(*args, **kwargs)
+ self.mime_filter = ['application/pdf']
+
+ def grobid_fulltext(self, content):
+ r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument",
+ files={'input': content})
+ if r.status_code is not 200:
+ # XXX:
+ print("FAIL (Grobid: {}): {}".format(r.content.decode('utf8')))
+ else:
+ # XXX:
+ print("SUCCESS: " + debug_line)
+ return r.json()
+
+ def mapper_init(self):
+
+ if self.hb_table is None:
+ try:
+ host = self.options.hbase_host
+ # TODO: make these configs accessible from... mrconf.cfg?
+ hb_conn = happybase.Connection(host=host, transport="framed",
+ protocol="compact")
+ except Exception as err:
+ raise Exception("Couldn't connect to HBase using host: {}".format(host))
+ self.hb_table = hb_conn.table(self.options.hbase_table)
+
+ def parse_line(self, raw_cdx):
+
+ if (raw_cdx.startswith(' ') or raw_cdx.startswith('filedesc') or
+ raw_cdx.startswith('#')):
+ return None, dict(status="invalid", reason="line prefix")
+
+ info = parse_cdx_line(raw_cdx)
+ if info is None:
+ return None, dict(status="invalid", reason="CDX parse")
+
+ if info['file:mime'] not in self.mime_filter:
+ self.increment_counter('lines', 'skip')
+ return None, dict(status="skip", reason="mimetype")
+
+ # If warc is not item/file.(w)arc.gz form, skip it
+ if len(info['file:cdx']['warc'].split('/')) != 2:
+ self.increment_counter('lines', 'skip')
+ return None, dict(status="skip", reason="WARC path not petabox item/file")
+
+ return info, None
+
+ def extract(self, info):
+
+ # Fetch data from WARCs in petabox
+ try:
+ rstore = ResourceStore(loaderfactory=CDXLoaderFactory())
+ gwb_record = rstore.load_resource(
+ info['file:cdx']['warc'],
+ info['file:cdx']['offset'],
+ info['file:cdx']['c_size'])
+ except IOError as ioe:
+ # XXX: catch correct error
+ self.increment_counter('lines', 'existing')
+ return _, dict(status="existing")
+
+ if gwb_record.get_status()[0] != 200:
+ self.increment_counter('lines', 'error')
+ return _, dict(status="error", reason="non-HTTP-200 WARC content")
+
+ # Submit to GROBID
+ content = gwb_record.open_raw_content()
+ try:
+ grobid_result = self.grobid_fulltext(gwb_record.open_raw_content())
+ except IOError as ioe:
+ # XXX: catch correct error
+ self.increment_counter('lines', 'existing')
+ return _, dict(status="existing")
+
+ info['file:size'] = len(resource_data)
+
+ info['grobid0:status_code'] = None
+ info['grobid0:quality'] = None
+ info['grobid0:status'] = {}
+ info['grobid0:tei_xml'] = None
+ info['grobid0:tei_json'] = {}
+ info['grobid0:metadata'] = {}
+
+ # Convert TEI XML to JSON
+ # TODO
+
+ # Determine extraction "quality"
+ # TODO
+
+ return info, None
+
+ def mapper(self, _, raw_cdx):
+ """
+ 1. parse CDX line
+ 2. check what is already in hbase
+ 3. fetch data from wayback
+ 4. submit to GROBID
+ 5. convert GROBID response to JSON (and metadata)
+ 6. determine "quality"
+ 7. push results to hbase
+ """
+
+ self.increment_counter('lines', 'total')
+
+ # Parse line and filter down
+ info, status = self.parse_line(raw_cdx)
+ if info is None:
+ self.increment_counter('lines', status['status'])
+ return _, status
+
+ # Check if we've already processed this line
+ oldrow = self.hb_table.get(info['key'], columns=['f', 'file',
+ 'grobid:status_code'])
+ if row.get('grobid0:status', None):
+ # This file has already been processed; skip it
+ self.increment_counter('lines', 'existing')
+ return _, dict(status="existing")
+
+ # Do the extraction
+ info, status = self.extract(info)
+ if info is None:
+ self.increment_counter('lines', status['status'])
+ return _, status
+
+ # Decide what to bother inserting back into HBase
+ # Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx')
+ grobid_status = info.get('grobid0:status_code', None)
+ for k in info.keys():
+ if k in oldrow:
+ info.pop(k)
+
+ # Convert fields to binary
+ for k in info.keys():
+ if k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json',
+ 'grobid0:metadata'):
+ assert type(info[k]) == dict
+ info[k] = json.dumps(info[k], sort_keys=True, indent=None)
+ if k in ('file:size', 'grobid0:status_code'):
+ # encode as int64 in network byte order
+ info[k] = struct.pack('!q', info[k])
+
+ key = info.pop('key')
+ self.hb_table.put(key, info)
+ self.increment_counter('lines', 'success')
+
+ yield _, dict(status="success", grobid_status=grobid_status)
+
+if __name__ == '__main__':
+ MRExtractCdxGrobid.run()
+
diff --git a/extraction/tests/test_extraction_cdx_grobid.py b/extraction/tests/test_extraction_cdx_grobid.py
new file mode 100644
index 0000000..71b55a3
--- /dev/null
+++ b/extraction/tests/test_extraction_cdx_grobid.py
@@ -0,0 +1,169 @@
+
+import io
+import json
+import pytest
+import mrjob
+import responses
+import happybase_mock
+from extraction_cdx_grobid import MRExtractCDXGROBID
+
+
+
+def test_parse_cdx_line():
+
+ raw = "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz"
+ correct = {
+ 'key': "sha1:WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G",
+ 'file:mime': "application/pdf",
+ 'file:cdx': {
+ 'surt': "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf",
+ 'url': "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf",
+ 'dt': "20170828233154",
+ 'warc': "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz",
+ 'offset': 931661233,
+ 'c_size': 210251,
+ },
+ 'f:c': {
+ 'u': "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf",
+ 'd': "2017-08-28T23:31:54",
+ 'f': "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz",
+ 'o': 931661233,
+ 'c': 1,
+ }
+ }
+
+ assert transform_line(raw) == correct
+ assert transform_line(raw + "\n") == correct
+ assert transform_line(raw + " extra_field") == correct
+
+@pytest.fixture
+def job():
+ """
+ Note: this mock only seems to work with job.run_mapper(), not job.run();
+ the later results in a separate instantiation without the mock?
+ """
+ conn = happybase_mock.Connection()
+ conn.create_table('wbgrp-journal-extract-test',
+ {'file': {}, 'grobid0': {}, 'f': {}})
+ table = conn.table('wbgrp-journal-extract-test')
+
+ job = MRCDXBackfillHBase(['--no-conf', '-'], hb_table=table)
+ return job
+
+
+@responses.activate
+def test_mapper_lines(job):
+
+ fake_grobid = {}
+ responses.add(responses.POST, 'http://localhost:9070/api/processFulltextDocument', status=200,
+ body=json.dumps(fake_grobid), content_type='application/json')
+
+ raw = io.BytesIO(b"""
+com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 301 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz
+eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=761393014319a39f40d32ae3eb3a853f?sequence=1 20170705062202 http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1 application/PDF 200 MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J - - 854156 328850624 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz
+com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robots.txt text/plain 200 6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD - - 638 398190140 CITESEERX-CRAWL-2017-06-20-20170705062707827-00049-00058-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705063158203-00053-31209~wbgrp-svc284.us.archive.org~8443.warc.gz
+""")
+
+ job.sandbox(stdin=raw)
+ job.run_mapper()
+
+ # wayback gets FETCH 1x times
+
+ # grobid gets POST 3x times
+
+ # hbase
+
+
+ assert job.hb_table.row(b'1') == {}
+ # HTTP 301
+ assert job.hb_table.row(b'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ') == {}
+ # valid
+ assert job.hb_table.row(b'sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J') != {}
+ # text/plain
+ assert job.hb_table.row(b'sha1:6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD') == {}
+
+ row = job.hb_table.row(b'sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J')
+
+ assert struct.unpack("", row[b'file:size']) == 12345
+ assert row[b'file:mime'] == b"application/pdf"
+ assert struct.unpack("", row[b'grobid0:status_code']) == 200
+ assert row[b'grobid0:quality'] == None # TODO
+ status = json.loads(row[b'grobid0:status'].decode('utf-8'))
+ assert type(row[b'grobid0:status']) == type(dict())
+ assert row[b'grobid0:tei_xml'] == "<xml><lorem>ipsum</lorem></xml>"
+ tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8'))
+ metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8'))
+
+def test_parse_cdx_invalid(job):
+
+ print("valid")
+ raw = io.BytesIO(b"com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz")
+ info, status = job.parse_line(raw)
+ assert status is None
+
+ print("space-prefixed line")
+ raw = io.BytesIO(b" com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz")
+ info, status = job.parse_line(raw)
+ assert info is None
+ assert status['status'] == "invalid"
+ assert 'prefix' in status['reason']
+
+ print("commented line")
+ raw = io.BytesIO(b"#com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz")
+ info, status = job.parse_line(raw)
+ assert info is None
+ assert status['status'] == "invalid"
+ assert 'prefix' in status['reason']
+
+ print("wrong column count")
+ raw = io.BytesIO(b"a b c d")
+ info, status = job.parse_line(raw)
+ assert info is None
+ assert status['status'] == "invalid"
+ assert 'parse' in status['reason']
+
+ print("missing mimetype")
+ raw = io.BytesIO(b"com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz")
+ info, status = job.parse_line(raw)
+ assert info is None
+ assert status['status'] == "invalid"
+ assert 'parse' in status['reason']
+
+ print("HTTP status")
+ raw = io.BytesIO(b"com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 501 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz")
+ info, status = job.parse_line(raw)
+ assert info is None
+ assert status['status'] == "invalid"
+
+ print("datetime")
+ raw = io.BytesIO(b"com,sagepub,cep)/content/28/9/960.full.pdf 20170705 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 501 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz")
+ info, status = job.parse_line(raw)
+ assert info is None
+ assert status['status'] == "invalid"
+
+
+def test_parse_cdx_skip(job):
+
+ print("warc format")
+ raw = io.BytesIO(b"com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz")
+ info, status = job.mapper(raw)
+ assert info is None
+ assert status['status'] == "skip"
+ assert 'WARC' in status['reason']
+
+ print("mimetype")
+ raw = io.BytesIO(b"com,sagepub,cep)/content/28/9/960.full.pdf 20170705 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz")
+ info, status = job.mapper(raw)
+ assert info is None
+ assert status['status'] == "skip"
+ assert 'mimetype' in status['reason']
+
+def test_tei_json_convert():
+ # TODO: load xml test vector, run it through
+ with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f:
+ xml_content = f.read()
+ pass
+
+def test_tei_json_convert_invalid():
+ # TODO: pass in junk
+ pass
diff --git a/extraction/xml2json.py b/extraction/xml2json.py
new file mode 100644
index 0000000..f956014
--- /dev/null
+++ b/extraction/xml2json.py
@@ -0,0 +1,8 @@
+
+import json
+import sys
+import xmltodict
+
+with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'rb') as f:
+ thing = xmltodict.parse(f, process_namespaces=False)
+ print(json.dumps(thing))