From e8eb959fbdd5d13cd53421ddf2487811d049c4e8 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 4 Apr 2018 11:47:41 -0700 Subject: more WIP on extractor --- extraction/TODO | 2 + extraction/extraction.py | 52 ------ extraction/extraction_cdx_grobid.py | 248 +++++++++++++++++++++++++ extraction/tests/test_extraction_cdx_grobid.py | 169 +++++++++++++++++ extraction/xml2json.py | 8 + 5 files changed, 427 insertions(+), 52 deletions(-) create mode 100644 extraction/TODO delete mode 100644 extraction/extraction.py create mode 100755 extraction/extraction_cdx_grobid.py create mode 100644 extraction/tests/test_extraction_cdx_grobid.py create mode 100644 extraction/xml2json.py diff --git a/extraction/TODO b/extraction/TODO new file mode 100644 index 0000000..ed10834 --- /dev/null +++ b/extraction/TODO @@ -0,0 +1,2 @@ +- abstract CDX line reading and HBase stuff out into a common library +- actual GROBID_SERVER="http://wbgrp-svc096.us.archive.org:8070" diff --git a/extraction/extraction.py b/extraction/extraction.py deleted file mode 100644 index cdca433..0000000 --- a/extraction/extraction.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python3 - -import io -import sys -import requests -#import happybase -import mrjob -from mrjob.job import MRJob -from wayback.resource import Resource -from wayback.resource import ArcResource -from wayback.resourcestore import ResourceStore -from gwb.loader import CDXLoaderFactory - - -def process_pdf_using_grobid(content_buffer, debug_line): - """Query GrobId server & process response - """ - GROBID_SERVER="http://wbgrp-svc096.us.archive.org:8070" - content = content_buffer.read() - r = requests.post(GROBID_SERVER + "/api/processFulltextDocument", - files={'input': content}) - if r.status_code is not 200: - print("FAIL (Grobid: {}): {}".format(r.content.decode('utf8'), debug_line)) - else: - print("SUCCESS: " + debug_line) - -class Cdx_Record_Pipeline(object): - - def read_cdx_and_parse(self, parser_func, accepted_mimes = []): - """Read in CDX lines and process PDF records fetched over HTTP - """ - rstore = ResourceStore(loaderfactory=CDXLoaderFactory()) - for line in sys.stdin: - line = line.rstrip() - cdx_line = line.split() - #ignoring NLine offset - if len(cdx_line) != 12: - continue - cdx_line = cdx_line[1:] - (src_url, timestamp, mime, record_location, record_offset, record_length) = (cdx_line[2], cdx_line[1], cdx_line[3], cdx_line[-1], cdx_line[-2], cdx_line[-3]) - if '-' == record_length or not record_location.endswith('arc.gz') or mime not in accepted_mimes: - continue - orig_url = cdx_line[2] - debug_line = ' '.join(cdx_line) - try: - record_location = 'http://archive.org/download/' + record_location - record_offset = int(record_offset) - record_length = int(record_length) - resource_data = rstore.load_resource(record_location, record_offset, record_length) - parser_func(resource_data.open_raw_content(), debug_line) - except: - continue diff --git a/extraction/extraction_cdx_grobid.py b/extraction/extraction_cdx_grobid.py new file mode 100755 index 0000000..54d8b71 --- /dev/null +++ b/extraction/extraction_cdx_grobid.py @@ -0,0 +1,248 @@ +#!/usr/bin/env python3 +""" +Streaming Hadoop script to import extract metadata and body from fulltext (eg, +PDF) files using GROBID. Input is a CDX file; results primarly go to HBase, +with status written to configurable output stream. + +Fulltext files are loaded directly from WARC files in petabox, instead of going +through the wayback replay. + +Requires: +- happybase +- mrjob +- wayback/GWB libraries +""" + +import io +import sys +import struct +import requests +import happybase +import mrjob +from mrjob.job import MRJob +from wayback.resource import Resource +from wayback.resource import ArcResource +from wayback.resourcestore import ResourceStore +from gwb.loader import CDXLoaderFactory + + +def parse_cdx_line(raw_cdx): + + cdx = raw_cdx.split() + if len(cdx) < 11: + return None + + surt = cdx[0] + dt = cdx[1] + url = cdx[2] + mime = normalize_mime(cdx[3]) + http_status = cdx[4] + key = cdx[5] + c_size = cdx[8] + offset = cdx[9] + warc = cdx[10] + + if not (key.isalnum() and c_size.isdigit() and offset.isdigit() + and http_status == "200" and len(key) == 32 and dt.isdigit()): + return None + + if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc): + return None + + key = "sha1:{}".format(key) + + info = dict(surt=surt, dt=dt, url=url, c_size=int(c_size), + offset=int(offset), warc=warc) + + warc_file = warc.split('/')[-1] + dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() + try: + dt_iso = datetime.strptime(dt, "%Y%m%d%H%M%S").isoformat() + except: + return None + + # 'i' intentionally not set + heritrix = dict(u=url, d=dt_iso, f=warc_file, o=int(offset), c=1) + return {'key': key, 'file:mime': mime, 'file:cdx': info, 'f:c': heritrix} + + +class MRExtractCdxGrobid(MrJob): + + # CDX lines in; JSON status out + INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol + OUTPUT_PROTOCOL = mrjob.protocol.JSONValueProtocol + + def configure_args(self): + super(MRExtractCdxGrobid, self).configure_args() + + self.add_passthru_arg('--hbase-table', + type=str, + default='wbgrp-journal-extract-0-qa', + help='HBase table to backfill into (must exist)') + self.add_passthru_arg('--hbase-host', + type=str, + default='localhost', + help='HBase thrift API host to connect to') + self.add_passthru_arg('--grobid-uri', + type=str, + default='http://localhost:8070', + help='URI of GROBID API Server') + + def __init__(self, *args, **kwargs): + + # Allow passthrough for tests + if 'hb_table' in kwargs: + self.hb_table = kwargs.pop('hb_table') + else: + self.hb_table = None + + super(MRExtractCdxGrobid, self).__init__(*args, **kwargs) + self.mime_filter = ['application/pdf'] + + def grobid_fulltext(self, content): + r = requests.post(self.options.grobid_uri + "/api/processFulltextDocument", + files={'input': content}) + if r.status_code is not 200: + # XXX: + print("FAIL (Grobid: {}): {}".format(r.content.decode('utf8'))) + else: + # XXX: + print("SUCCESS: " + debug_line) + return r.json() + + def mapper_init(self): + + if self.hb_table is None: + try: + host = self.options.hbase_host + # TODO: make these configs accessible from... mrconf.cfg? + hb_conn = happybase.Connection(host=host, transport="framed", + protocol="compact") + except Exception as err: + raise Exception("Couldn't connect to HBase using host: {}".format(host)) + self.hb_table = hb_conn.table(self.options.hbase_table) + + def parse_line(self, raw_cdx): + + if (raw_cdx.startswith(' ') or raw_cdx.startswith('filedesc') or + raw_cdx.startswith('#')): + return None, dict(status="invalid", reason="line prefix") + + info = parse_cdx_line(raw_cdx) + if info is None: + return None, dict(status="invalid", reason="CDX parse") + + if info['file:mime'] not in self.mime_filter: + self.increment_counter('lines', 'skip') + return None, dict(status="skip", reason="mimetype") + + # If warc is not item/file.(w)arc.gz form, skip it + if len(info['file:cdx']['warc'].split('/')) != 2: + self.increment_counter('lines', 'skip') + return None, dict(status="skip", reason="WARC path not petabox item/file") + + return info, None + + def extract(self, info): + + # Fetch data from WARCs in petabox + try: + rstore = ResourceStore(loaderfactory=CDXLoaderFactory()) + gwb_record = rstore.load_resource( + info['file:cdx']['warc'], + info['file:cdx']['offset'], + info['file:cdx']['c_size']) + except IOError as ioe: + # XXX: catch correct error + self.increment_counter('lines', 'existing') + return _, dict(status="existing") + + if gwb_record.get_status()[0] != 200: + self.increment_counter('lines', 'error') + return _, dict(status="error", reason="non-HTTP-200 WARC content") + + # Submit to GROBID + content = gwb_record.open_raw_content() + try: + grobid_result = self.grobid_fulltext(gwb_record.open_raw_content()) + except IOError as ioe: + # XXX: catch correct error + self.increment_counter('lines', 'existing') + return _, dict(status="existing") + + info['file:size'] = len(resource_data) + + info['grobid0:status_code'] = None + info['grobid0:quality'] = None + info['grobid0:status'] = {} + info['grobid0:tei_xml'] = None + info['grobid0:tei_json'] = {} + info['grobid0:metadata'] = {} + + # Convert TEI XML to JSON + # TODO + + # Determine extraction "quality" + # TODO + + return info, None + + def mapper(self, _, raw_cdx): + """ + 1. parse CDX line + 2. check what is already in hbase + 3. fetch data from wayback + 4. submit to GROBID + 5. convert GROBID response to JSON (and metadata) + 6. determine "quality" + 7. push results to hbase + """ + + self.increment_counter('lines', 'total') + + # Parse line and filter down + info, status = self.parse_line(raw_cdx) + if info is None: + self.increment_counter('lines', status['status']) + return _, status + + # Check if we've already processed this line + oldrow = self.hb_table.get(info['key'], columns=['f', 'file', + 'grobid:status_code']) + if row.get('grobid0:status', None): + # This file has already been processed; skip it + self.increment_counter('lines', 'existing') + return _, dict(status="existing") + + # Do the extraction + info, status = self.extract(info) + if info is None: + self.increment_counter('lines', status['status']) + return _, status + + # Decide what to bother inserting back into HBase + # Particularly: ('f:c', 'file:mime', 'file:size', 'file:cdx') + grobid_status = info.get('grobid0:status_code', None) + for k in info.keys(): + if k in oldrow: + info.pop(k) + + # Convert fields to binary + for k in info.keys(): + if k in ('f:c', 'file:cdx', 'grobid0:status', 'grobid0:tei_json', + 'grobid0:metadata'): + assert type(info[k]) == dict + info[k] = json.dumps(info[k], sort_keys=True, indent=None) + if k in ('file:size', 'grobid0:status_code'): + # encode as int64 in network byte order + info[k] = struct.pack('!q', info[k]) + + key = info.pop('key') + self.hb_table.put(key, info) + self.increment_counter('lines', 'success') + + yield _, dict(status="success", grobid_status=grobid_status) + +if __name__ == '__main__': + MRExtractCdxGrobid.run() + diff --git a/extraction/tests/test_extraction_cdx_grobid.py b/extraction/tests/test_extraction_cdx_grobid.py new file mode 100644 index 0000000..71b55a3 --- /dev/null +++ b/extraction/tests/test_extraction_cdx_grobid.py @@ -0,0 +1,169 @@ + +import io +import json +import pytest +import mrjob +import responses +import happybase_mock +from extraction_cdx_grobid import MRExtractCDXGROBID + + + +def test_parse_cdx_line(): + + raw = "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf 20170828233154 https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf application/pdf 200 WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G - - 210251 931661233 SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz" + correct = { + 'key': "sha1:WL3FEA62TEU4F52Y5DOVQ62VET4QJW7G", + 'file:mime': "application/pdf", + 'file:cdx': { + 'surt': "edu,upenn,ldc)/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf", + 'url': "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf", + 'dt': "20170828233154", + 'warc': "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828231135742-00000-00009-wbgrp-svc284/SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + 'offset': 931661233, + 'c_size': 210251, + }, + 'f:c': { + 'u': "https://www.ldc.upenn.edu/sites/www.ldc.upenn.edu/files/medar2009-large-arabic-broadcast-collection.pdf", + 'd': "2017-08-28T23:31:54", + 'f': "SEMSCHOLAR-PDF-CRAWL-2017-08-04-20170828232253025-00005-3480~wbgrp-svc284.us.archive.org~8443.warc.gz", + 'o': 931661233, + 'c': 1, + } + } + + assert transform_line(raw) == correct + assert transform_line(raw + "\n") == correct + assert transform_line(raw + " extra_field") == correct + +@pytest.fixture +def job(): + """ + Note: this mock only seems to work with job.run_mapper(), not job.run(); + the later results in a separate instantiation without the mock? + """ + conn = happybase_mock.Connection() + conn.create_table('wbgrp-journal-extract-test', + {'file': {}, 'grobid0': {}, 'f': {}}) + table = conn.table('wbgrp-journal-extract-test') + + job = MRCDXBackfillHBase(['--no-conf', '-'], hb_table=table) + return job + + +@responses.activate +def test_mapper_lines(job): + + fake_grobid = {} + responses.add(responses.POST, 'http://localhost:9070/api/processFulltextDocument', status=200, + body=json.dumps(fake_grobid), content_type='application/json') + + raw = io.BytesIO(b""" +com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 301 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz +eu,eui,cadmus)/bitstream/handle/1814/36635/rscas_2015_03.pdf;jsessionid=761393014319a39f40d32ae3eb3a853f?sequence=1 20170705062202 http://cadmus.eui.eu/bitstream/handle/1814/36635/RSCAS_2015_03.pdf%3Bjsessionid%3D761393014319A39F40D32AE3EB3A853F?sequence%3D1 application/PDF 200 MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J - - 854156 328850624 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz +com,pbworks,educ333b)/robots.txt 20170705063311 http://educ333b.pbworks.com/robots.txt text/plain 200 6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD - - 638 398190140 CITESEERX-CRAWL-2017-06-20-20170705062707827-00049-00058-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705063158203-00053-31209~wbgrp-svc284.us.archive.org~8443.warc.gz +""") + + job.sandbox(stdin=raw) + job.run_mapper() + + # wayback gets FETCH 1x times + + # grobid gets POST 3x times + + # hbase + + + assert job.hb_table.row(b'1') == {} + # HTTP 301 + assert job.hb_table.row(b'sha1:3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ') == {} + # valid + assert job.hb_table.row(b'sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J') != {} + # text/plain + assert job.hb_table.row(b'sha1:6VAUYENMOU2SK2OWNRPDD6WTQTECGZAD') == {} + + row = job.hb_table.row(b'sha1:MPCXVWMUTRUGFP36SLPHKDLY6NGU4S3J') + + assert struct.unpack("", row[b'file:size']) == 12345 + assert row[b'file:mime'] == b"application/pdf" + assert struct.unpack("", row[b'grobid0:status_code']) == 200 + assert row[b'grobid0:quality'] == None # TODO + status = json.loads(row[b'grobid0:status'].decode('utf-8')) + assert type(row[b'grobid0:status']) == type(dict()) + assert row[b'grobid0:tei_xml'] == "ipsum" + tei_json = json.loads(row[b'grobid0:tei_json'].decode('utf-8')) + metadata = json.loads(row[b'grobid0:metadata'].decode('utf-8')) + +def test_parse_cdx_invalid(job): + + print("valid") + raw = io.BytesIO(b"com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") + info, status = job.parse_line(raw) + assert status is None + + print("space-prefixed line") + raw = io.BytesIO(b" com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") + info, status = job.parse_line(raw) + assert info is None + assert status['status'] == "invalid" + assert 'prefix' in status['reason'] + + print("commented line") + raw = io.BytesIO(b"#com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") + info, status = job.parse_line(raw) + assert info is None + assert status['status'] == "invalid" + assert 'prefix' in status['reason'] + + print("wrong column count") + raw = io.BytesIO(b"a b c d") + info, status = job.parse_line(raw) + assert info is None + assert status['status'] == "invalid" + assert 'parse' in status['reason'] + + print("missing mimetype") + raw = io.BytesIO(b"com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") + info, status = job.parse_line(raw) + assert info is None + assert status['status'] == "invalid" + assert 'parse' in status['reason'] + + print("HTTP status") + raw = io.BytesIO(b"com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 501 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") + info, status = job.parse_line(raw) + assert info is None + assert status['status'] == "invalid" + + print("datetime") + raw = io.BytesIO(b"com,sagepub,cep)/content/28/9/960.full.pdf 20170705 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 501 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") + info, status = job.parse_line(raw) + assert info is None + assert status['status'] == "invalid" + + +def test_parse_cdx_skip(job): + + print("warc format") + raw = io.BytesIO(b"com,sagepub,cep)/content/28/9/960.full.pdf 20170705062200 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") + info, status = job.mapper(raw) + assert info is None + assert status['status'] == "skip" + assert 'WARC' in status['reason'] + + print("mimetype") + raw = io.BytesIO(b"com,sagepub,cep)/content/28/9/960.full.pdf 20170705 http://cep.sagepub.com/content/28/9/960.full.pdf application/pdf 200 3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ - - 401 313356621 CITESEERX-CRAWL-2017-06-20-20170705061647307-00039-00048-wbgrp-svc284/CITESEERX-CRAWL-2017-06-20-20170705062052659-00043-31209~wbgrp-svc284.us.archive.org~8443.warc.gz") + info, status = job.mapper(raw) + assert info is None + assert status['status'] == "skip" + assert 'mimetype' in status['reason'] + +def test_tei_json_convert(): + # TODO: load xml test vector, run it through + with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'r') as f: + xml_content = f.read() + pass + +def test_tei_json_convert_invalid(): + # TODO: pass in junk + pass diff --git a/extraction/xml2json.py b/extraction/xml2json.py new file mode 100644 index 0000000..f956014 --- /dev/null +++ b/extraction/xml2json.py @@ -0,0 +1,8 @@ + +import json +import sys +import xmltodict + +with open('tests/files/23b29ea36382680716be08fc71aa81bd226e8a85.xml', 'rb') as f: + thing = xmltodict.parse(f, process_namespaces=False) + print(json.dumps(thing)) -- cgit v1.2.3