diff options
Diffstat (limited to 'mapreduce/cdx-record-pipeline')
| -rw-r--r-- | mapreduce/cdx-record-pipeline/README.md | 33 | ||||
| -rwxr-xr-x | mapreduce/cdx-record-pipeline/cdx-record-pipeline.py | 67 | ||||
| -rw-r--r-- | mapreduce/cdx-record-pipeline/requirements.txt | 3 | 
3 files changed, 103 insertions, 0 deletions
diff --git a/mapreduce/cdx-record-pipeline/README.md b/mapreduce/cdx-record-pipeline/README.md new file mode 100644 index 0000000..797b8eb --- /dev/null +++ b/mapreduce/cdx-record-pipeline/README.md @@ -0,0 +1,33 @@ +CDX Record Pipeline (GrobId Edition) +===================================== + +Hadoop based pipeline to process PDFs from a specified IA CDX dataset + +## Local mode example ## + +``` +cat -n /home/bnewbold/100k_random_gwb_pdf.cdx | ./cdx-record-pipeline.py +  +``` + +## Cluster mode example ## + +``` +input=100k_random_gwb_pdf.cdx +output=100k_random_gwb_pdf.out +lines_per_map=1000 + +hadoop jar /home/webcrawl/hadoop-2/hadoop-mapreduce/hadoop-streaming.jar +	-archives "hdfs://ia802400.us.archive.org:6000/lib/cdx-record-pipeline-venv.zip#cdx-record-pipeline-venv" +	-D mapred.reduce.tasks=0 +	-D mapred.job.name=Cdx-Record-Pipeline +	-D mapreduce.job.queuename=extraction +	-D mapred.line.input.format.linespermap=${lines_per_map}  +	-inputformat org.apache.hadoop.mapred.lib.NLineInputFormat  +	-input ${input} +	-output ${output} +	-mapper cdx-record-pipeline.py +	-file cdx-record-pipeline.py + +``` + diff --git a/mapreduce/cdx-record-pipeline/cdx-record-pipeline.py b/mapreduce/cdx-record-pipeline/cdx-record-pipeline.py new file mode 100755 index 0000000..9e521bf --- /dev/null +++ b/mapreduce/cdx-record-pipeline/cdx-record-pipeline.py @@ -0,0 +1,67 @@ +#!./cdx-record-pipeline-venv/bin/python +''' +GrobId PDF Pipeline Test +Read in CDX lines and query GROBID server for each PDF resource +TODO: Testing / HBase integration -- Bryan will update as needed +''' +import os +import re +import sys +import base64 +import hashlib +import urllib +import urlparse +import re +import string +from wayback.resource import Resource +from wayback.resource import ArcResource +from wayback.resourcestore import ResourceStore +from gwb.loader import CDXLoaderFactory +from StringIO import StringIO +import requests +import sys + +def process_pdf_using_grobid(content_buffer, debug_line): +    """Query GrobId server & process response +    """ +    GROBID_SERVER="http://wbgrp-svc096.us.archive.org:8070" +    content = content_buffer.read() +    r = requests.post(GROBID_SERVER + "/api/processFulltextDocument", +            files={'input': content}) +    if r.status_code is not 200: +        print("FAIL (Grobid: {}): {}".format(r.content.decode('utf8'), debug_line)) +    else: +        print("SUCCESS: " + debug_line) + +class Cdx_Record_Pipeline(object): + +    def read_cdx_and_parse(self, parser_func, accepted_mimes = []): +        """Read in CDX lines and process PDF records fetched over HTTP +        """ +        rstore = ResourceStore(loaderfactory=CDXLoaderFactory())  +        for line in sys.stdin: +            line = line.rstrip() +            cdx_line = line.split() +            #ignoring NLine offset +            if len(cdx_line) != 12: +                continue +            cdx_line = cdx_line[1:] +            (src_url, timestamp, mime, record_location, record_offset, record_length) = (cdx_line[2], cdx_line[1], cdx_line[3], cdx_line[-1], cdx_line[-2], cdx_line[-3]) +            if '-' == record_length or not record_location.endswith('arc.gz') or mime not in accepted_mimes: +                continue +            orig_url = cdx_line[2] +            debug_line = ' '.join(cdx_line) +            try: +                record_location = 'http://archive.org/download/' + record_location +                record_offset = int(record_offset) +                record_length = int(record_length) +                resource_data = rstore.load_resource(record_location, record_offset, record_length) +                parser_func(resource_data.open_raw_content(), debug_line) +            except: +                continue +          +# main() +#_______________________________________________________________________________ +if __name__ == '__main__': +    cdx_record_pipeline = Cdx_Record_Pipeline() +    cdx_record_pipeline.read_cdx_and_parse(process_pdf_using_grobid, ['application/pdf', 'application/x-pdf']) diff --git a/mapreduce/cdx-record-pipeline/requirements.txt b/mapreduce/cdx-record-pipeline/requirements.txt new file mode 100644 index 0000000..17b803f --- /dev/null +++ b/mapreduce/cdx-record-pipeline/requirements.txt @@ -0,0 +1,3 @@ +--extra-index-url https://devpi.archive.org/wb/prod/ +wayback==0.2.1.2 +GlobalWayback==0.3  | 
